DEV Community

Cover image for Streams in NodeJS
Kinanee Samson
Kinanee Samson

Posted on

Streams in NodeJS

Streams, an abstraction for dealing with huge amounts of data efficiently by processing it in small chunks, this is one of the most fundamental concepts in NodeJS and most of the time you'd be consuming API's that implement a stream interface rather than creating one so it's best to familiarize ourselves with the basic workings of streams, in this article we are going to be looking at the different kind of streams, what properties and methods are defined on them, what events they emit and how we interact with instances of those streams in our code.

Streams are built by default to only process strings or buffers, however they can also work with other types of data with the exception of nulll, the objectMode has to be passed to be true when we create a stream if we want the stream to process Objects. All streams store data in an internal buffer and we say that the data is buffered, the amount of data potentially buffered depends on the highWaterMark option passed into the stream's constructor.

When Object mode is set as true the highWaterMark option will be limit of the number objects that is buffered. The highWaterMark option is a threshold, not a limit: it dictates the amount of data that a stream buffers before it stops asking for more data. It does not enforce a strict memory limitation in general. Specific stream implementations may choose to enforce stricter limits but doing so is optional.

Types Of Streams

There are different implementation of streams in NodeJS and we will proceed to looking at them below, we will however only concern ourselves with readable and writeable streams.

  • Readable streams
  • Writeable streams
  • Duplex streams
  • Transform streams

Writeable Streams

Writeable streams are sources that implement the Stream.Writeable that we can write data to, some instances of writeable streams are;

  • Http.ServerResponse on the server
  • fs.createReadableStream()
  • Http.Request on the client.

Methods

All writeable streams have a write method that you can use to write data to the stream.

writeableStream.write('some data');
Enter fullscreen mode Exit fullscreen mode

We can call end() on the writeable stream to close it, optionally we can write one last bit of data to the stream before closing it.

writeableStream.end('some extra data');
Enter fullscreen mode Exit fullscreen mode

Since all streams buffers data internally, a multiple calls to write() method on a stream will lead to the writeable stream buffering data internally, we can also call cork() on a writeable stream, this will ensure that all calls to write() is buffered until we call uncork() or end() calling anyone of these; uncork() or end() will flush the buffered data.

We can call destroy() on a writable stream and we can pass in an optional error message that will cause the writeable stream to emit the error event, this method will also cause the writeable stream to emit the close event.

const myErr = new Error('My error');

writeable.destroy(myErr);

writeable.on('error', err => console.log(err));

writeable.on('close', () => console.log('close'));
Enter fullscreen mode Exit fullscreen mode

We can set a default encoding, that will be used as the data encoding in the stream, by calling setDefaultEncoding('utf8') on the writeable stream.

writeablestream.setDefaultEncoding('base64')
Enter fullscreen mode Exit fullscreen mode

Piping

Writeable streams are pipeable, this means that we if we have a readable stream we can pipe the data coming from the readable stream into a writeable stream, by calling pipe() on the readable stream. The pipe event will be emitted and the source readable stream that we piped to we be passed in as argument to the listener function attached to the unpipe event.

readablestream.pipe(writeablestream)

writeableStream.on('pipe', (src) => console.log(src);
Enter fullscreen mode Exit fullscreen mode

Calling destroy on the writeable stream will detach it from the readable stream it is piped to while emitting the unpipe event, likewise calling end() on the writable stream will also trigger the same effect, if the readable stream is closed it will also emit the unpipe event.

import fs from 'fs';

let hero = {
  name: 'superman',
  alais: 'Clark Kent'
}; 

let writable = fs.createWritableStream('./hero.json');

writeable.write(JSON.stringify(hero, null, 2));

writeable.cork()

writeable.write('Hey im corked');

writeable.uncork()
Enter fullscreen mode Exit fullscreen mode

Readable Stream

Readable streams are sources that we can read data from, they implement Stream.Readable which is defined by the stream class, a good example of a readable stream is process.stdin which allows us to read data entered by the user from the console, others include;

  • Http.IncomingMessage on the Server,
  • fs.createReadableStream
  • server response on the client

A readable stream can be in one of two states, either in flowing state or paused state. In flowing state data that is read from the stream is made available to our application by the event emitter interface. A stream in a paused state can only be read by calling the read() method.

Readable streams begin in paused state but can be switched to the flowing state by any of the following;

  • An event handler can be attached to the data event or we can piping it to a writable stream.
  • Calling the resume() method on the string will also cause the stream to be in a flowing state.

If at any state we wish to change the state of the stream back to paused you can call pause on the readable stream, or we unpipe from all writable streams.

At any point in time a readable stream is in one of three possible states:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

When a readable stream starts out readableFlowing === null because there is no way to consume the data in the stream, readableFlowing === true whenever we pipe() to it, or we attach an event listener to the data event. readableFlowing === false when the we unpipe() from the stream or we call readable.pause(). While a readable stream is in the paused state by calling readable.pause(), attaching an event listener to data event will not cause readable.readableFlowing === true

Readable streams are characterized by emitting a data event whenever some data is pushed to the readable stream, we can push data to a readable stream by calling readableStream.push(data) to send data to a readable stream, this data is stored, buffered internally by the readable stream, the data event serves to notify us of this new data and makes it available to us.

let writable = process.stdout;
let readable = process.stdin;

// readableFlowing == null
writable.write('Hello There, Please enter your name');

readable.on('data', data => writable.write(data));
// readableFlowing == true;

// Instead
// readable.pipe(writeable) 
// will also cause readable.readableFlowing == true;
Enter fullscreen mode Exit fullscreen mode

Its is counter intuitive to use more than one of the available methods of consuming data from a readable stream due to the inconsistencies it could lead to, it is either we pipe the readable stream to a writable stream, or we attach an event listener to the data event.

Piping

Readable streams are called pipeable streams because we can pipe the data being read from a readable stream, straight into a writeable stream, this reduces the amount of data that is buffered internally by the readable stream, we call the pipe we attach a writable stream where the data that is being read will be transferred to.

import fs from 'fs';

let readable = fs.createReadStream('./hero.json');
let writable = process.stdout;

writable.on('pipe', (src) => {
    console.log(src == readable);
    console.log(src);
})

readable.pipe(writeable);
Enter fullscreen mode Exit fullscreen mode

A readable stream will emit the close event whenever the underlying resources in the stream is disposed, while it will emit an end event when we are at the end of the data we are reading. The open event will be emitted whenever the stream is opened and while the ready event is emitted when we can start consuming data from the readable stream.

const  fs  =  require('fs');

let  readable  =  fs.createReadStream('./lorem-Ipsum.txt');

readable.on('open', () =>  console.log('Opened!'));

readable.on('ready', () =>  console.log('ready'));

readable.on('data', data  =>  console.log(data));

readable.on('close', () =>  console.log('closed'));

readable.on('end', () =>  console.log('end'));

// Opened!
// ready
// <Buffer 4c 6f 72 65 6d 20 69 70 73 75 6d 20 64 6f 6c 6f 72 20 73 69 74 20 61 6d 65 74 20 63 6f 6e 73 65 63 74 65 74 75 72 20 61 64 69 70 69 73 63 69 6e 67 20 ... 9830 more bytes>
// end
// closed
Enter fullscreen mode Exit fullscreen mode

Pushing Data To A Readable Stream

Although Readable streams are sources that we can read from, there exists a mechanism for pushing data to the readable stream, we can do this by calling push on the readable stream.

const  stream  =  require('stream');

let  readable  =  new stream.Readable();

readable._read  =  function() {};

readable.push('hello');

readable.on('data', (data) =>  console.log(data.toString()));
Enter fullscreen mode Exit fullscreen mode

That is it for writable and readable streams, hope you found this useful and informative, in our next article we are going to be looking at Duplex and Transform streams, you can definitely check the official NodeJS documentation for more information about streams, if there is anything you would add about readable or writable streams that i left out feel free to chip in it. Definitely leave your experience working with streams in NodeJS.

Oldest comments (0)