DEV Community

Grzegorz Rozdzialik
Grzegorz Rozdzialik

Posted on

Pitfall when updating Subjects synchronously

I have recently implemented a table state management solution using RxJS and a concept called synchronized state. In this post, I will share the lessons learned when it comes to updating RxJS Subjects synchronously.

Simplified table state

The table state consists of:

  1. Table data - rows
  2. Properties that the user controls - currentPage, pageSize
  3. Derived data - maxPage (based on rows and pageSize)

Notice, that currentPage should be synchronized, as it should not exceed maxPage, but could be changed independently of other properties.

Modeling the state using RxJS

I decided to leverage the following BehaviorSubjects and observables to be responsible for the table state:

  1. rowsSubject
  2. pageSizeSubject
  3. maxPage$ (observable based on pageSizeSubject and rowsSubject)
  4. currentPageSubject

and then combine it all into tableState$ (a single observable). The graph of observables looks as follows:

Table state streams graph

To synchronize the currentPage state, I added a tap operator on top of tableState$ that would update currentPage whenever maxPage would change.

Play around with the initial values or emit new values from the subjects.

Disclaimer: In a real-world example you when there are multiple subscribers to tableState$ you probably want to add a shareReplay(1) operator at the end to avoid synchronizing the state multiple times.

Pitfall - emitting invalid states last

Notice what happens when you add the following line at the end of the code above:

pageSizeSubject.next(10);

This means that all 10 rows can fit on a single page, so maxPage should change to 1, and so should currentPage. The expected order of changes looks as follows:

Expected updates after changing page size

First, maxPage$ is recalculated (1). Then, tableState$ (2), along with its tap, that changes the currentPageSubject (3). Next, tableState$ updates again (4), emitting the final value.

The result is different:

// ... previous emissions
{
  currentPage: 1,
  rows: [
    1, 2, 3, 4,  5,
    6, 7, 8, 9, 10
  ],
  pageSize: 10,
  maxPage: 1
}
{
  currentPage: 2,
  rows: [
    1, 2, 3, 4,  5,
    6, 7, 8, 9, 10
  ],
  pageSize: 10,
  maxPage: 1
}

Notice that the second to last state emitted by tableState$ is the right one (from step 4 in the image above) - both maxPage and currentPage are set correctly. However, the last emitted state is wrong - currentPage is not synchronized, as it is still set to 2 (from step 2 in the image above). It looks as if RxJS was forgetting about our state synchronization and then emitted the previous value with a smile on its face saying "Joke's on you". What is actually happening here?

To fully grasp the order of operations let's take a look at taps source code, particularly the _next handler:

  _next(value: T) {
    try {
      this._tapNext.call(this._context, value);
    } catch (err) {
      this.destination.error(err);
      return;
    }
    this.destination.next(value);
  }

In essence, this code first calls our callback passed to tap (this._tapNext.all), and then emits the value (this.destination.next). In the callback, the currentPageSubject is updated synchronously. All operations that build up the tableState$ are also synchronous. This means that all operations with the updated currentPage (along with emitting the updated state (state 4 in the image)) happen before the execution returns from this._tapNext.call for the invalid state (state 2 in the image).

Thus, 4 is emitted before 2, leaving the subscribers with an invalid, desynchronized state.

The fix

The fix, in this case, is very small and it involves changing the operator. As you can see, what we do not want is the invalid state to be emitted last. Actually, we do not want invalid states to be emitted at any time.

This means that we can use the filter operator to prevent emitting invalid states.

When do we know that some state is invalid? When we need to synchronize it.

This means that we need to change our existing use of tap for the filter operator:

  filter((state) => {
    if (state.currentPage > state.maxPage) {
      currentPageSubject.next(state.maxPage);
      return false;
    }

    return true;
  })

Now, no invalid state is emitted, and the state is synchronized as expected.

Bonus - having access to previous state during synchronization

Some state synchronization may need to happen when some property changes without any additional condition. This is when we need to compare the previous state and the current state to see if that property changed and synchronize the state.

In the current solution, there is no access to the previous state inside the tap/filter operators.

Solution - pairwise

One way to solve it is using more RxJS operators.

The pairwise operator gives us access to the current and the previous value in the observable, which is exactly what we want.

However, pairwise does not emit until it has received at least 2 values. That is because it needs to emit the current and the previous value, and it cannot emit just the current one.

To solve that we can use the startWith operator and inject an empty object ({}) as the initial value. This way, pairwise will emit an array with the current state and an empty object on initial emission.

Remember to transform the array back to a single value using the map (or pluck) operator, to avoid emitting both previous and current value from the returned observable.

The code below implements this solution. Notice that we have access to both previousState and state in the filter operator's callback (the one that is supposed to synchronize the state).

Tip: pluck(1) is the same as map(([_, state]) => state)

Conclusion

RxJS is really powerful and allows for writing complex behaviors with few lines of code using composable operators. However, keep in mind that most operations are synchronous, so the order of operations is similar to the ones you would expect in regular function calls. This can mean that when updating the source, the whole pipeline runs and emits a new value before the previous one is emitted, resulting in unwanted behavior.

A fix to that is preventing the unwanted value to be emitted using the filter operator.

Top comments (0)