DEV Community 👩‍💻👨‍💻

Neha Sharma
Neha Sharma

Posted on

#4 Nodejs Module - streams

In the last 2 blogs we learned about the 2 modules:

  1. OS

  2. FS

Today we will learn new module which can be used with the fs module - streams.

What is streams?

Let me ask question:

In fs we saw that fs module is perfect for handling small size files. But what is small size?

Answer: the small size means a file which your system can handle in one go and store in the system RAM.

Now, if you have the data which is large and more than your system's RAM then how we are going to handle it?

We need to use 'streams'. Streams are streams of data mainly the arrays of binary data that can be read, write, pause, resume. Instead of caching the data and showing cached data, by using the streams we can show the real time data to the users and applications.

All streams are the instance of the eventEmitter. They emit events and we can use these events to read and write data.

Type of streams:

1 . writable:

2 . readable

3 . duplex

4 . transform

Image description

Where to use streams?

Streams are used in handling the large data or live data. Eg: live streaming.

Relationship between - fs, and streams

There is a string relationship between fs and streams. One can use both in combination to accomplish the tasks. Eg: one can open a writable stream, write data in a file using fs module, and then start a readable stream to read the data.

Writeable

writeable streams is an abstraction of a destination where the data can be written. All writeable streams has write and end method.

Image description

All write streams internally get buffered. We can use cork() to make sure they are getting buffered until we call uncork() or end(), this will flush the buffered data.destory() , we can pass an optional parameter error message that will cause the writeable stream to emit the error event, this method will also cause the writeable stream to emit the close event.

Methods

  • write

  • end

  • cork

  • uncork

  • destroy

Important events

  • drain

  • finish

  • pipe/unpipe

  • error

  • close

const writeStream = fs.createWriteStream(`streamData.txt`);

writeStream.write('data', 'utf-8');

writeStream.end();

writeStream.on('finish', () => {
    console.log('success!!')
})

writeStream.on('error', (err) => {
    console.log(err)
})

Enter fullscreen mode Exit fullscreen mode

Readable

readable streams is an abstraction for a data source from where the data can be read.

Image description

A readable stream can be in 2 states:

  1. flow

  2. pause

Default state is pause. We can resume a pause state (or move to a flow state) by using resume() method or we can pause a stream by using pause().

The stream in flow mode can be made available to application by using eventEmitter interface on. The stream in the pause mode can be only read by calling read() method.

To check the state of the stream we can use:

readable.readableFlowing

The possible mode:

  1. null: there is no data to consume

  2. true: indicates the stream is in flow state. When we use pipe() or data()

  3. false: indicates the stream is in pause state. When we use unpipe()

Important events

  • data

  • end

  • error

  • close

  • readable

const readStream = fs.createReadStream(`${FILE_NAME}`);
readStream.setEncoding('UTF8');

readStream.on('stream', (chunk) => {
    data += chunk;
})

readStream.on('end',function() {
    console.log(data);
 });

readStream.on('error', function(err) {
    console.log(err.stack);
 });

Enter fullscreen mode Exit fullscreen mode

Duplex

Duplex is both read and write streams. It has 2 internal buffers, one for the readable and one for the writeable. It enables to read and write data concurrently.

Example: chat application, socket server
Image description


net.createServer(socket => {
    socket.pipe(socket)
}).listen(8001);

Enter fullscreen mode Exit fullscreen mode

Transform

It is duplex but only addition is the data can be transform or modify as it is written or read. It is a powerful way of doing advance work. Eg: you can read the data and convert the data before writing to uppercase, or convert to different language, or zip and write to a destination.

Image description


const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

Enter fullscreen mode Exit fullscreen mode

pipe() and pipeline()

pipe is useful and important feature. Through pipe we can connect the streams. So, we can start a readableStream and while stream is getting read same time we can write the stream to a file. This can be done by using pipe. Pipe reduce the internal buffering of the data by readable streams. Readable streams are call pipeable streams.

However, do not use pipe() in production. As pipe can introduce memory leaks.

  1. If one of the piped streams is closed or throws an error, pipe() will not automatically destroy the connected streams

  2. pipe() does not automatically forward errors across streams to be handled in one place.

Hence, use pipeline() to cater the above issues. pipeline() accepts a callback function as the last parameter. Any forwarded errors from any of the piped streams will call the callback, so it's easier to handle errors for all streams in one place.

Think like what pipe does in real world they connect and let the flow of water. Similarly we are doing the same here.

For example, it handles errors, end-of-files, and the cases when one stream is slower or faster than the other.


const readerPipe = fs.createReadStream(`${FILE_NAME}`);
const writerPipe = fs.createWriteStream(`output.txt`);

readerPipe.pipeline(writerPipe)

readerPipe.on('stream', (chunk) => {
    data += chunk;
})

readerPipe.on('finish', () => {
    console.log('success!!');
})

console.log('success')

Enter fullscreen mode Exit fullscreen mode

Backpressuring

Let's take an example, assume you have are reading a big file and writing to a another file using streams and pipe. There could be a situation where the flow of the data is too fast and consume of the data is slow. In this situation, the data can be keep buffering and lead to:

  1. High memory usage

  2. Poor garbage collector performance

  3. May cause application to crash.

So, what should we can do in such situation?

We read that the both stream - readable and writeable stream has internal buffer but the size of the buffer is limited to the available memory of the system. So, if we want to set a indicator to stop the flow of the data and resume later, how we can do it?

We can set a highWatermark which will put a threshold on the data that will be streamed and once the threshold is crossed we can pause' stream andresume' it.

PS: this is just a thresholder indicator.

const readableStream = fs.createReadStream('./logs.txt', {
    highWaterMark: 10
});

readableStream.on('readable', () => {
    process.stdout.write(`[${readableStream.read()}]`);
});

readableStream.on('end', () => {
    console.log('DONE');
});

Enter fullscreen mode Exit fullscreen mode

Important

  1. Use streams where the data is large.

  2. Use pipe to have the write and read in parallel.

  3. readable streams are pipeable streams.

  4. Do not use pipe() in production. As it could introduce memory leaks. Use pipeline()

  5. We have 2 states of the readable stream: flow and pause. By default the stream is in pause.

  6. highWatermark puts threshold to control the flow of the data.

  7. Streams would be in array of binary data, use JSON.stringify() to convert to string.

Happy Learning!!

Resources

NodeJS Documentation

Like it? Do follow me on Twitter and Linkedin.

Top comments (1)

DEV

Thank you.

 
Thanks for visiting DEV, we’ve worked really hard to cultivate this great community and would love to have you join us. If you’d like to create an account, you can sign up here.