DEV Community

Osama Qarem
Osama Qarem

Posted on

Advanced Async Logic with Redux Observable

Imagine that you're building a social network for cats. To register, a cat has to be verified that they are who they say they are by uploading a selfie and a photo of its paw. Our backend runs both images through its FelineCatusMLโ„ข billion dollar machine learning model and helps us verify that the pawprint and face belong to the same furball by going through legal cat records. If we believe kitty is playing tricks, we will terminate the sign up process.

For example, if a cat says that its a brown Munchkin called Peanut, but the backend returned name: 'Peanut' for the selfie and name: 'Sir Williams' for the pawprint, registration will fail as names must match.

This is the flow of our imaginary app. Each step is a different page/screen:

  1. Cat takes paw photo (API call #1).

  2. Cat starts filling up form.

  3. Cat takes selfie (API call #2).

  4. Check the outcome of API calls #1 & #2 and determine if sign up is successful.

Here are some optimistic assumptions:

Because the first API call happens first, it completes first.

But in the real world, either one may complete first.

The user will wait patiently staring at loading indicators while any API calls are running :)

The user will want to be able to cancel whatever is happening on their screen whenever they want to. Especially cat users. Cats have very short attention spans and cannot stare at loading indicators for long.

Our apps need to be able to handle process cancellation. That means stopping all operations that are no longer needed. Not only are those considered memory leaks, but they can lead to very unexpected behavior. Imagine if a cat goes through step 1, then cancels (API call #1 still running) then comes back and does step 1 again. You can rarely ever predict what a cat is up to. It would not be out of the question to assume that we now have 2 API calls racing to update the same piece of state. And we cannot guarantee which one will finish first. So what is our next state? Unknown.

Of course, we could make the process not cancellable "Either you finish this right now, or you exit my app/close the browser". But that is not good user experience. We must aim for flexible applications. They are more pleasant to use.

Our API calls are probably happening in our redux actions. Besides process cancellation, how can we be notified when our app is in the following state:

  • Both API calls #1 & #2 have successfully returned results that are non-errors (remember, both calls are happening concurrently).

How can we handle cancellation and order of completion elegantly without writing too much code that is hard to follow?

We have options for Redux. There are several middleware for handling complex asynchronous logic well documented by the docs (best docs ever, by the way) [1] [2]. You can actually use whichever one you are comfortable with to deal with similar and more complex challenges. I was heavily considering Redux Saga as it is the most popular. But I am not as familiar with JavaScript generators as I am with RxJs, and so I went with Redux Observable.

Redux-Observable ๐Ÿ”

As you can probably tell from the name, this library makes use of observables, particularly from RxJs. In fact, the API of redux-observable is about 3-4 functions. Most of the code we will write will actually be from RxJs.

๐Ÿ›‘ If you are not familiar with Rx, then you will likely face difficulty following the code in this article.

Brief intro using promises

Official

With redux-observable we create epics. An epic is a function that takes in two arguments. The first one is a stream of actions running through your reducers.

  // Redux action payload shape.
  type Action = {type: string; payload?: any}

  // Actions stream. An observable that wraps a payload.
  actions$ : ActionsObservable<Action>
Enter fullscreen mode Exit fullscreen mode

When you dispatch an action, it runs through your reducers, then your epics. Usually, an epic is set to execute when a specific action type is dispatched.
The second argument is the stream of our store's state. This means that an epic may access the store's current state. We won't need that here.

To keep this article short, I'll leave installing and setting up the library to you, as you can follow the instructions on the official docs.

Epics ๐Ÿ”ฅ

Our API calls will happen in our epics. We want to execute the first API call as soon as we have the necessary data. Therefore, we will create separate epics for each API call.

1. getNameForPawEpic() will make the first API call.
2. getNameForFaceEpic() will make the second API call.

We will also have a third epic:

3. verificationEpic() will run when both epics above have dispatched success actions.

// Action creator
export const getNameForPaw = pawPhotoBase64 => ({
  type: GET_NAME_FOR_PAW,
  payload: pawPhotoBase64
});
Enter fullscreen mode Exit fullscreen mode

This action creator is called from our component. The dispatched action runs through our reducers, but there is no match for its type therefore it does not change our store's state. Its purpose is to run our first epic which will be listening to its action type.

// Epic
export const getNameForPawEpic = actions$ =>
  actions$.pipe(
    ofType(GET_NAME_FOR_PAW),
    switchMap(({ payload }) => {
      const request = {
        url: "http://api.felinecatus.com/pawprint/verification",
        method: "POST",
        headers: {
          "content-type": "application/json"
        },
        body: {
          pawPhotoBase64: payload
        }
      };

      return ajax(request).pipe(
        map(res => {
          if (res.statusCode === 200) {
            return {
              type: GET_NAME_FOR_PAW_SUCCESS,
              payload: res.data
            };
          } else {
            return {
              type: GET_NAME_FOR_PAW_ERROR,
              payload: res.errorMessage
            };
          }
        }),
        takeUntil(actions$.pipe(ofType(CANCEL))),
        catchError(err => {
          return of({
            type: GET_NAME_FOR_PAW_ERROR,
            payload: res.errorMessage
          });
        })
      );
    })
  );
Enter fullscreen mode Exit fullscreen mode

So what is our epic doing? It is:

1. Listening to a stream of actions using a redux-observable helper function ofType() which filters by the type we specify.

2. When GET_NAME_FOR_PAW action is dispatched, the epic will let that action through.

3. We switchMap() our action. This is simply the safe choice when mapping async actions. Since we don't want this action somehow executing more than once at a time, it will 'switch' to the most recent call, and map the result to what follows.

We destructure the payload from our action and create the API call using the ajax() operator. We map the result to either success or error types. What is to be done with the error is to be handled by our component.

Epics take in a stream of actions, and let out plain action objects. That is why we are returning action payloads.

   //  In
   action$: ActionsObservable<Action>

   // Out. Passes through our reducers.
   action:  Action
Enter fullscreen mode Exit fullscreen mode

4. We have a an action type called CANCEL which when dispatched, will cancel all of our network requests. We might dispatch this when the user navigates away to cancel the process. This works because takeUntil() is listening to the stream of actions and completes our ajax observable if the cancellation action type comes through.

5. catchError() does what its called. In case something unexpected happens, we can handle it here. It must return an observable though, that is why we use of() on the action returned from within it.

This is our action creator for our second epic:

// Action creator for second epic
export const getNameForFace = facePhotoBase64 => ({
  type: GET_NAME_FOR_FACE,
  payload: facePhotoBase64
});
Enter fullscreen mode Exit fullscreen mode

The code for getNameForFaceEpic() is very similar to the first epic, except it listens to GET_NAME_FOR_FACE and dispatches GET_NAME_FOR_FACE_SUCCESS on success and GET_NAME_FOR_FACE_ERROR on error. So we will stick to pretending that we have written it ๐Ÿ™‚.

Our first 2 epics combined act as the action creator of our third epic:

// Third epic
export const verificationEpic = actions$ => {
  const paw$ = actions$.pipe(ofType(GET_NAME_FOR_PAW_SUCCESS));
  const face$ = actions$.pipe(ofType(GET_NAME_FOR_FACE_SUCCESS));
  const combined$ = zip(face$, paw$);

  const cancel$ = actions$.pipe(ofType(CANCEL));

  return combined$.pipe(
    map(([face, paw]) => {
      const verifiedKitty = face.payload === paw.payload;

      return { type: VERIFICATION_COMPLETE, payload: verifiedKitty };
    }),
    takeUntil(cancel$),
    repeat()
  );
};
Enter fullscreen mode Exit fullscreen mode

1. We create paw$ & face$ which are streams of actions filtered by the success types of the first 2 epics.

2. We combine those two using the zip operator which creates a new stream that emits a value only when both streams emit once. This way, we can get a success notification only when both succeed.

3. When both API calls succeed, we process our payloads and map the output to VERIFICATION_COMPLETE which runs through our reducers and updates our store's state. Our component handles the outcome.

In case one epic sends off the success action, our combined$ stream will now wait for the other pair to emit its success action. In case the user cancelled the process or an error in the other request ocurred, no success actions are going to happen. Therefore, we need to be able to reset our combined$ stream to listen to both success actions again.

4. We do takeUntil(cancel$). This will complete our observable returned by our epic when the CANCEL type comes through. Since we completed the outermost observable in our epic, it is now dead and no longer working.

5. To remedy this, we pipe the repeat() operator, which restarts our combined$ observable as soon as it completes making it listen to both face$ and paw$ from square one again.


And that's how it's done! ๐ŸŽ‰

Cancellable, flexible and concise async process management โœ… with redux. And we've only used a few RxJs operators. Remember, You can use more than one redux middleware at the same time. I am still using redux-thunk alongside redux-observable. Simple problems should still be solved with simple solutions.

Thank you for reading. I hope you found this useful.

Top comments (2)

Collapse
 
ibearua profile image
Ibe Arua

if you are using angular, can you show code snippet of adding this to the ngRedux configure store parameter. or do you a link to the full code.

I enjoyed it!

Collapse
 
osamaqarem profile image
Osama Qarem • Edited

Glad you liked it! Unfortunately I've never used Angular, but I hope this snippet will be of some help:

reducers/index.ts

import { combineReducers } from "redux";
import { combineEpics } from "redux-observable";
import {
  myEpic,
  myOtherEpic
} from "../actions";
import myReducer from "./myReducer";
import myOtherReducer from "./myOtherReducer";

// Pass epics.
// Actions will run through reducers first and then epics.
export const rootEpic = combineEpics(
  myEpic,
  myOtherEpic
);

// Pass reducers.
export default combineReducers({
  myReducer,
  myOtherReducer
});

store/index.ts

import { createStore, applyMiddleware } from "redux";
import thunk from "redux-thunk";
import reducers, { rootEpic } from "../reducers";
import { createEpicMiddleware } from "redux-observable";

const epicMiddleware = createEpicMiddleware();

const store = createStore(
  reducers,
  {},
  applyMiddleware(thunk, epicMiddleware)
);

epicMiddleware.run(rootEpic);

// Useful if using TypeScript (optional)
export type RootStoreType = ReturnType<typeof reducers>;

export default store;