DEV Community

Cover image for RxJs and Redux-Observable
Gautam Vaja for CodeParrot

Posted on

RxJs and Redux-Observable

Redux-Observable is a middleware for Redux that uses RxJS to handle asynchronous actions. It offers an alternative to redux-thunk and redux-saga, allowing you to work with async actions using observables.

Understanding the Observer Pattern

Before diving into RxJS and Redux-Observable, let's revisit the Observer Pattern. In this pattern, an "Observable" object maintains a list of "Observers". When the Observable's state changes, it notifies all its Observers.



document.addEventListener("click", (event) => {
  console.log("Element clicked:", event);
});


Enter fullscreen mode Exit fullscreen mode

In this example, addEventListener makes the document an Observable, and the callback function is the Observer.

Diving into RxJS

RxJS (Reactive Extensions for JavaScript) is a library for composing asynchronous and event-based programs using observable sequences. It extends the Observer pattern by providing operators that allow you to compose Observables in a declarative manner.

Key Concepts in RxJS

  • Observers: Objects that subscribe to Observables and receive notifications.
  • Observables: Objects that emit data over time.
  • Operators: Functions that allow you to manipulate Observables.
  • Subjects: Special types of Observables that are also Observers.

Observers and Observables

Observers

Observers are objects that can subscribe to Observables and receive notifications of three types: next, error, and complete. Here's a basic example of an Observer in action:



import { Observable } from "rxjs";

const observable = new Observable((subscriber) => {
  subscriber.next("Hello");
  subscriber.next("World");
  subscriber.complete();
});

const observer = {
  next: (value) => console.log("Received value:", value),
  error: (err) => console.error("Error:", err),
  complete: () => console.log("Completed"),
};

observable.subscribe(observer);


Enter fullscreen mode Exit fullscreen mode

Expected Output:



Received value: Hello
Received value: World
Completed


Enter fullscreen mode Exit fullscreen mode

Observables

Observables emit data over time and can be created using the new Observable constructor. Here’s an example where an Observable emits values periodically:



const observable = new Observable((subscriber) => {
  let count = 1;
  const intervalId = setInterval(() => {
    subscriber.next(count++);
    if (count > 5) {
      clearInterval(intervalId);
      subscriber.complete();
    }
  }, 1000);
});

observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log("Completed"),
});


Enter fullscreen mode Exit fullscreen mode

Expected Output:



1
2
3
4
5
Completed


Enter fullscreen mode Exit fullscreen mode

Subjects

A Subject is a special type of Observable that can multicast to multiple Observers. Here’s an example:



import { Subject } from "rxjs";

const subject = new Subject();

subject.subscribe({
  next: (value) => console.log(`Observer 1: ${value}`),
});

subject.subscribe({
  next: (value) => console.log(`Observer 2: ${value}`),
});

subject.next("Hello");
subject.next("World");


Enter fullscreen mode Exit fullscreen mode

Expected Output:



Observer 1: Hello
Observer 2: Hello
Observer 1: World
Observer 2: World


Enter fullscreen mode Exit fullscreen mode

Note:

  • Observables are unicast, meaning each subscription is independent.
  • Subjects are multicast, meaning they share the same execution path among all subscribers

What Are Operators?

Operators are functions that allow you to manipulate and transform Observables. Here are some examples:

Creation Operators

from

Creates an Observable from an array:



import { from } from "rxjs";

const observable = from([1, 2, 3, 4]);

observable.subscribe((value) => console.log(value));


Enter fullscreen mode Exit fullscreen mode

Expected Output:



1
2
3
4


Enter fullscreen mode Exit fullscreen mode

Pipeable Operators

map

Transforms each value emitted by the source Observable:



import { map } from "rxjs/operators";
import { of } from "rxjs";

const observable = of(1, 2, 3, 4).pipe(map((value) => value * 2));

observable.subscribe((value) => console.log(value));


Enter fullscreen mode Exit fullscreen mode

Expected Output:



2
4
6
8


Enter fullscreen mode Exit fullscreen mode

filter

Filters the emitted values based on a condition:



import { filter } from "rxjs/operators";
import { of } from "rxjs";

const observable = of(1, 2, 3, 4, 5).pipe(filter((value) => value % 2 === 0));

observable.subscribe((value) => console.log(value));


Enter fullscreen mode Exit fullscreen mode

Expected Output:



2
4


Enter fullscreen mode Exit fullscreen mode

mergeMap

Maps each value to an Observable and flattens the inner Observables:



import { mergeMap } from "rxjs/operators";
import { of } from "rxjs";

const observable = of("Hello", "World").pipe(
  mergeMap((value) => of(`${value} RxJS`))
);

observable.subscribe((value) => console.log(value));


Enter fullscreen mode Exit fullscreen mode

Expected Output:



Hello RxJS
World RxJS


Enter fullscreen mode Exit fullscreen mode

switchMap

Switches to a new Observable on each emission, canceling the previous one:



import { switchMap } from "rxjs/operators";
import { interval, of } from "rxjs";

const observable = interval(1000).pipe(
  switchMap((value) => of(`Switched to ${value}`))
);

observable.subscribe((value) => console.log(value));


Enter fullscreen mode Exit fullscreen mode

Expected Output:



Switched to 0
Switched to 1
Switched to 2
... (continues every second)


Enter fullscreen mode Exit fullscreen mode

Setting Up Redux-Observable

To start using Redux-Observable, you need to install the necessary packages:



npm install redux-observable rxjs


Enter fullscreen mode Exit fullscreen mode

Creating an Epic

An epic is a function that takes a stream of actions and returns a stream of actions. Let's start with a basic example:



import { ofType } from "redux-observable";
import { mapTo } from "rxjs/operators";

const pingEpic = (action$) =>
  action$.pipe(ofType("PING"), mapTo({ type: "PONG" }));

export default pingEpic;


Enter fullscreen mode Exit fullscreen mode

Here, when a PING action is dispatched, the epic intercepts it and maps it to a PONG action.

Integrating the Epic with Redux



import { createStore, applyMiddleware } from "redux";
import { createEpicMiddleware } from "redux-observable";
import rootReducer from "./reducers";
import pingEpic from "./epics";

const epicMiddleware = createEpicMiddleware();

const store = createStore(rootReducer, applyMiddleware(epicMiddleware));

epicMiddleware.run(pingEpic);


Enter fullscreen mode Exit fullscreen mode
  • createEpicMiddleware(): This function creates the middleware required for Redux-Observable.
  • applyMiddleware(epicMiddleware): This applies the epic middleware to your Redux store.
  • epicMiddleware.run(pingEpic): This runs the pingEpic, allowing it to start intercepting actions.

When the Redux store is set up and a PING action is dispatched, the pingEpic will intercept it and dispatch a PONG action.

Handling AJAX Requests with Redux-Observable

Let's take a more practical example where we fetch user data from an API. We'll define action creators, create an epic to handle the AJAX request, and update the reducer to process the actions.

Action Creators

First, define action creators for starting the fetch and handling the response:



export const fetchUser = () => ({ type: "FETCH_USER" });
export const fetchUserFulfilled = (payload) => ({
  type: "FETCH_USER_FULFILLED",
  payload,
});


Enter fullscreen mode Exit fullscreen mode

Epic for AJAX Request

Create an epic to handle the AJAX request:



import { ofType } from "redux-observable";
import { ajax } from "rxjs/ajax";
import { mergeMap, map, catchError } from "rxjs/operators";
import { fetchUserFulfilled } from "./actions";

const fetchUserEpic = (action$) =>
  action$.pipe(
    ofType("FETCH_USER"),
    mergeMap(() =>
      ajax.getJSON("/api/user").pipe(
        map((response) => fetchUserFulfilled(response)),
        catchError(() => of({ type: "FETCH_USER_FAILED" }))
      )
    )
  );

export default fetchUserEpic;


Enter fullscreen mode Exit fullscreen mode
  1. ofType('FETCH_USER'): Filters the actions to only include those with the type 'FETCH_USER'.
  2. ajax.getJSON('/api/user'): Makes an AJAX request to fetch user data from the /api/user endpoint.
  3. map(response => fetchUserFulfilled(response)): Maps the AJAX response to a FETCH_USER_FULFILLED action.
  4. catchError(() => of({ type: 'FETCH_USER_FAILED' })): Catches any errors during the AJAX request and maps them to a FETCH_USER_FAILED action.

When a FETCH_USER action is dispatched, the epic makes an AJAX request. If the request is successful, a FETCH_USER_FULFILLED action is dispatched with the response data. If the request fails, a FETCH_USER_FAILED action is dispatched.

Combining Epics

If you have multiple epics, combine them using combineEpics:



import { combineEpics } from "redux-observable";
import fetchUserEpic from "./fetchUserEpic";

const rootEpic = combineEpics(fetchUserEpic);

export default rootEpic;


Enter fullscreen mode Exit fullscreen mode

Updating the Reducer

Update your reducer to handle the new actions:



const initialState = {
  user: null,
};

const userReducer = (state = initialState, action) => {
  switch (action.type) {
    case "FETCH_USER_FULFILLED":
      return { ...state, user: action.payload };
    default:
      return state;
  }
};

export default userReducer;


Enter fullscreen mode Exit fullscreen mode

When a FETCH_USER_FULFILLED action is dispatched, the user data in the state is updated with the fetched data. When a FETCH_USER_FAILED action is dispatched, an error message is set in the state.

Practical Use Cases for Redux-Observable

Debouncing API Requests

Let's say you want to provide autocomplete suggestions as the user types. Instead of making an API call for every keystroke, you can debounce the requests.



import { debounceTime, switchMap } from "rxjs/operators";

const searchEpic = (action$) =>
action$.pipe(
ofType("SEARCH"),
debounceTime(500),
switchMap((action) =>
ajax.getJSON(/api/search?q=</span><span class="p">${</span><span class="nx">action</span><span class="p">.</span><span class="nx">payload</span><span class="p">}</span><span class="s2">).pipe(
map((response) => ({ type: "SEARCH_FULFILLED", payload: response })),
catchError(() => of({ type: "SEARCH_FAILED" }))
)
)
);

export default searchEpic;

Enter fullscreen mode Exit fullscreen mode




Cancelling Ongoing Requests

Using switchMap, you can cancel the previous request if a new one comes in:



const searchEpic = (action$) =>
action$.pipe(
ofType("SEARCH"),
debounceTime(500),
switchMap((action) =>
ajax.getJSON(/api/search?q=</span><span class="p">${</span><span class="nx">action</span><span class="p">.</span><span class="nx">payload</span><span class="p">}</span><span class="s2">).pipe(
map((response) => ({ type: "SEARCH_FULFILLED", payload: response })),
catchError(() => of({ type: "SEARCH_FAILED" }))
)
)
);

export default searchEpic;

Enter fullscreen mode Exit fullscreen mode




Polling an API

You might need to poll an API to get updates regularly. Here's how you can do it:



import { interval } from "rxjs";
import { switchMap } from "rxjs/operators";

const pollEpic = (action$) =>
action$.pipe(
ofType("START_POLLING"),
switchMap(() =>
interval(5000).pipe(
switchMap(() =>
ajax.getJSON("/api/data").pipe(
map((response) => ({ type: "POLL_SUCCESS", payload: response })),
catchError(() => of({ type: "POLL_FAILED" }))
)
)
)
)
);

export default pollEpic;

Enter fullscreen mode Exit fullscreen mode




Handling WebSocket Connections

Redux-Observable can also be used to manage WebSocket connections:



import { webSocket } from "rxjs/webSocket";

const websocketEpic = (action$) =>
action$.pipe(
ofType("CONNECT_WEBSOCKET"),
switchMap(() =>
webSocket("ws://example.com").pipe(
map((message) => ({ type: "WEBSOCKET_MESSAGE", payload: message })),
catchError(() => of({ type: "WEBSOCKET_FAILED" }))
)
)
);

export default websocketEpic;

Enter fullscreen mode Exit fullscreen mode




Conclusion

Redux-Observable, powered by RxJS, provides a robust and flexible way to handle asynchronous actions in Redux applications. By embracing observables and functional programming, you can simplify your code and make it more maintainable.

Whether you're dealing with API calls, debouncing user input, managing WebSocket connections, or polling APIs, Redux-Observable offers powerful tools to manage these workflows efficiently.

If your application involves complex async workflows, give Redux-Observable a try. You might find it to be the perfect solution for your needs. For more detailed information and examples, check out the official Redux-Observable documentation.

Top comments (5)

Collapse
 
litlyx profile image
Antonio | CEO at Litlyx.com

Great post! Your explanation of RxJS and Redux-Observable is very clear and informative. I particularly appreciate how you broke down the Observer pattern and provided practical code examples for key concepts like Observers, Observables, and Subjects. The step-by-step guide to setting up Redux-Observable and integrating epics with Redux is very helpful for developers looking to manage complex async workflows. Your examples on handling AJAX requests, debouncing, and WebSocket connections demonstrate the versatility and power of Redux-Observable. Keep up the great work!

Antonio, CEO & Founder at Litlyx.com

Collapse
 
mvaja13 profile image
Gautam Vaja

Thanks.

Collapse
 
syedmuhammadaliraza profile image
Syed Muhammad Ali Raza

Well Explained

Collapse
 
mezieb profile image
Okoro chimezie bright

I really love RxJS and its goodness in angular since i started using it, make's data handling smooth, thanks for sharing.

Collapse
 
sudipmondal2002 profile image
SUDIP MONDAL

Great read