DEV Community

Cover image for Functional Reactive Programming with Node.js Streams
Suresh Mohan for Syncfusion, Inc.

Posted on • Originally published at syncfusion.com on

Functional Reactive Programming with Node.js Streams

Modern web applications use real-time events to provide a highly interactive user experience. As developers, it is essential to be aware of the tools and techniques to handle them appropriately.

Functional reactive programming (FRP) is a variant of reactive programming that appears to be one of the best solutions to address concerns with user experience. FRP supports complex data flows, making it easy to predictably develop and handle event streams.

In this article, I will take you through the basics of the FRP concept with Node.js. Then we’ll develop essential components to achieve FRP using only Node.js streams.

What is functional reactive programming (FRP)?

First, let’s understand what functional and reactive programming is:

  • Functional programming: The concept behind functional programming is that each time you call a function with the same parameters, it returns the same result. For that, functional programming languages must not hold any state. The advantage of this technique is that it makes code easier to test, more predictable, and therefore, in theory, more reliable.
  • Reactive programming: Reactive programming is programming with concurrent data streams. A program is considered reactive when a change in input results in a proportional change in output without having to update it explicitly. As a result, reactive programming entails writing code that works with time flow and computational events. It eliminates the burden of manually managing complex functionalities for software engineers.

The goal of FRP is to describe things like user inputs in a more direct declarative manner by making their behavior more apparent. There are two main concepts in FRP as different data categories, events, and behaviors are involved. Events indicate a value at a certain time, and behavior reflects values that change over time. When these concepts are joined effectively, the entire program becomes a collection of events and behaviors.

There are many amazing FRP-inspired libraries, such as RxJS and Bacon.js. But we shouldn’t forget the Node stream interface for FRP and event streams. Sadly, popular FRP-inspired libraries don’t make use of FRP. However, they have similar functions like Rx.Node.fromStream() and Bacon.fromBinder() to provide streaming capabilities like Node streams.

So, let’s see how to implement FRP using Node.js streams.

Need for Node streams in FRP

Let’s define a simple problem we want to solve. Assume that we fetch data from a database, in this case, a list of numbers. Later, we multiply each number by 100, filter the ones with a value higher than 500, and compute their sum.

const sumOfBiggerNumbers = async () => {
  const numbers= await DB.getNumbersAsArray({ collection: Number });

  const sum = numbers
   .map((number) => number.value* MULTIPLIER) 
   .filter((value) => value> 500) 
   .reduce((sum, value) => sum + value, 0); 

 return sum; 
};
sumOfBiggerNumbers();
Enter fullscreen mode Exit fullscreen mode

Let’s look at some potential drawbacks of this method. First, everything is kept in memory: the DB’s array is kept in memory. The memory cost can be enormous if there are hundreds of data points. Although map, filter, and reduce are helpful functions, the data set will rotate three times.

We can easily overcome such issues by using streams.

Introducing Node.js streams

Before moving forward with streams, let’s quickly recap what streams are.

Streams are one of the essential notions in Node.js applications. They efficiently manage reading and writing files, network communications, and end-to-end data transfer. For example, when you tell a program to read a file, it reads the file from beginning to end into memory before being processed.

But, with streams, you read it piece by piece, digesting the content without having to retain it all in memory.

Streams are of four categories:

  • Readable : You can only receive data from a readable stream. Data is buffered when pushed into a readable stream until a client begins to read it.
  • Writable : You can only send data from a writable stream.
  • Duplex : A readable and writable stream combined.
  • Transform : Identical to a duplex stream, except that the output is modified or transformed.

Let’s look at how we can solve the previously discussed problem with the help of streams.

Create a function named DB.getNumbersAsStream that returns the same results as DB.getNumbersAsArray in the previous scenario. But here, it’s as an object stream.

const stream = DB.getNumbersAsStream({ collection: Number });

stream.on(read, (number) => console.log(number)); 

// Output - When read callback is called
// {id:1, value: 150} 
// {id:2, value: 65}
Enter fullscreen mode Exit fullscreen mode

Then, create a transform stream to perform the multiplication operation. It multiplies each number data by the MULTIPLIER and returns the product. For this stream, we’ll use a simpler constructor (number data * MULTIPLIER = product). We are using a transform stream since it can read data, transform it, and then write the changed data in a predefined format.

// Transform stream.
Const MULTIPLIER = 100;

const multiply= new Transform({ 
  objectMode: true, 
  transform(number, encoding, callback) { 
    callback(null, number.value* MULTIPLIER); 
  }, 
});
Enter fullscreen mode Exit fullscreen mode

The complete code will look like the following and the stream.pipe() method helps us to connect the readable stream to the writeable stream.

Const { Transform } = require(stream); 

const multiply = new Transform({ 
   objectMode: true, 
   transform(number, encoding, callback) { 
     callback(null, number.value* MULTIPLIER); 
   }
});

const stream = await DB.getNumbersAsStream({ collection: Number });

const transformedStream = stream.pipe(multiply); 
transformedStream.on(data,(value) => console.log(value));

// This will be the output. 
// 15000
// 6500
Enter fullscreen mode Exit fullscreen mode

In the previous example, we implemented our map function .map((number => number.value*MULTIPLIER) as a transform stream, and filter and reduce can be written in the same way. But it is not necessary to write each function as a transform stream. Instead, we can handle it somewhere else while hiding the transform instantiation.

Now, let’s see how to do it.

Map

const map = (fn, options = {}) => new Transform({ 
  objectMode: true,  
  ...options, 

  transform(chunk, encoding, callback) { 
     let num; 
     try { 
       num = fn(chunk); 
     } catch (e) { 
        return callback(e); 
     } 
     callback(null, num); 
  }, 
});
Enter fullscreen mode Exit fullscreen mode

Filter

const filter = (fn, options = {}) => new Transform({  
  objectMode: true,  
  ...options,   

  transform(chunk, encoding, callback) {  
    let res;  
    try {  
      res = fn(chunk);  
    } catch (e) {  
       return callback(e);  
    }  
    return callback(null, res ? chunk : undefined);  
  },  
});
Enter fullscreen mode Exit fullscreen mode

Reduce

const reduce = (fn, acc, options = {}) => new Transform({  
  objectMode: true,  
  ...options,   

  transform(chunk, encoding, callback) {  
    try {  
      acc = fn(acc, chunk);  
    } catch (e) {  
      return callback(e);  
    }   
    return callback();  
  },   
  flush(callback) {  
    callback(null, acc);  
  },  
});
Enter fullscreen mode Exit fullscreen mode

While map and filter both emit values when the stream is running, reduce only emits at the end. We can’t predict the number of chunks (numbers) present in a stream, so we have to wait until the stream ends to emit the sum (reduced) value.

Next, we use map , filter , and reduce for streams so that we can reuse our business logic. Finally, we write our code in this manner.

const multiply= (number) => number.value* MULTIPLIER; 
const greaterThan500 = (value) => value> 500; 
const sum = (sum, value) => value+ sum;

stream.pipe(map(multiply)).pipe(filter(greaterThan500)).pipe(reduce(sum, 0));
Enter fullscreen mode Exit fullscreen mode

This is just a basic example. The final code will look as follows.

// Separate new map, filter, reduce functions created as transform streams in their module. 
const { map, filter, reduce } = require('./streamUtils');  

const multiply= (number) => number.value* MULTIPLIER; 
const greaterThan500 = (value) => value> 500; 
const sum = (sum, value) => value+ sum;  

const stream = await DB.getNumbersAsStream({ collection: Number });

return stream  
   .pipe(map(multiply))  
   .pipe(filter(greaterThan500 ))  
   .pipe(reduce(sum));
Enter fullscreen mode Exit fullscreen mode

With this approach, we can minimize the issues we had in the initial example. Memory is carefully managed because streams rarely overload your memory, keeping only the latest chunk. Furthermore, the data set is never cycled multiple times. Instead, every chunk traverses through the map and filter functions once before proceeding to the reduce function, which will also access the chunks one after the other.

Until now, we have tried FRP with Node streams, and it is clear that it increases code stability and facilitates debugging and testing. It is also possible to isolate all transformations into independent functions, give them appropriate names, consider the signature, and ensure sufficient coverage with testing.

Why functional reactive programming in Node.js?

Now that we’ve covered FRP basics with streams, it’s essential to understand why you should use Node.js to implement FRP.

Feature implementation extensibility

The FRP approach simplifies adding new features to an existing project and projects that demand frequent feature updates.

Code composability

The code is clean, fast, and has a small memory cost. The code is untidy because each map, filter, and reduce function is enclosed with a pipe function. We have two options: make the code chainable (.map().filter().reduce()) like arrays, or adopt a pointless code style. We can wrap the stream object to make the function chainable.

Scalability

Writing functional reactive code simplifies code management and increases application scalability.

Readability

Functional reactive programming paradigms dramatically minimize the number of code lines needed to develop a feature. In addition, it allows our reactive codebase to be more readable and comprehensible since it downsizes the callback problems.

Drawbacks of functional reactive programming in Node.js

On the other hand, the FRP technique has some disadvantages:

  • Code blocks are redundant due to splitting up the workflow and dispersing the pieces throughout all services.
  • Solid expertise in streams and event loops is essential in designing reactive services.

When to use the functional reactive programming method in Node.js?

FRP might not fit everywhere. However, it is ideal in the situations such as:

  • When we need to decentralize the application flow into maintainable microservices.
  • When there isn’t enough time to get an application into production.
  • When a brief failure in a prior dependency causes the entire system to fail.
  • When there are many asynchronous blocks of code, the anticipated results may be late.

Conclusion

FRP is more than a utility or a library. It shapes the way you architect and conceive apps. This article discussed the concept of functional reactive programming with Node.js streams along with its advantages, drawbacks, and applicability. I hope this article gave you new insight and many exciting features for your next Node.js application.

Thank you for reading!

Syncfusion Essential JS 2 is the only suite you will ever need to build an app. It contains over 65 high-performance, lightweight, modular, and responsive UI components in a single package. Download a free trial to evaluate the controls today.

If you have any questions or comments, you can contact us through our support forums, support portal, or feedback portal. We are always happy to assist you!

Top comments (0)