DEV Community

Matt Eddy
Matt Eddy

Posted on • Updated on

Node.js - Streams

Alt Text

Overview

This article is Part 4 of Working With NodeJS, a series of articles to help simplify learning Node.js. In this article I will cover Streams.

Introduction

Working with large amounts of data usually means working with streams. Streams allow for large data processing without requiring excessive compute resources. With streams you read data piece by piece, processing its content without keeping it all in memory.

Stream Basics

A stream is a collection of data that may not be available all at once. Think about watching a Youtube or Netflix video. When the video starts, it starts with just enough data to get the video going and over time the remaining data is processed. This type of data handling provides two major advantages over other data handling methods:

  1. Memory efficiency: you don't need to load large amounts of data in memory before you are able to process it.
  2. Time efficiency: it takes way less time to start processing data as soon as you have it, rather than waiting until the whole data payload is available to start.

With efficient data handling, we can deliver high volume data processing applications in Node.js. The stream module provides the foundation upon which all streaming APIs are built in Node.js. Within the stream module are four types of streams: Readable, Writable, Duplex, and Transform. Each of these implement the pipe method, which takes the source of one stream and connects its output to another stream. The connecting of two or more streams can thought of as a pipeline where the data of a stream passes through each phase of the pipeline until reaching its final destination.

Readable Stream

A readable stream could be used to read a file, read data from an incoming HTTP request, or read user input from a command prompt to name a few examples. The Readable constructor inherits from the Stream constructor which inherits from the EventEmitter constructor, so readable streams are event emitters. To create a readable stream, the Readable constructor is called with the new keyword and passed an options object with a read method.

code snippet
'use strict'
const { Readable } = require('stream')
const createReadStream = () => {
  const data = ['some', 'data', 'to', 'read']
  return new Readable({
    encoding: 'utf8',
    read () {
      if (data.length === 0) this.push(null)
      else this.push(data.shift())
    }
  })
}
const readable = createReadStream()
readable.on('data', (data) => { console.log('got data', data) })
readable.on('end', () => { console.log('finished reading') })
Enter fullscreen mode Exit fullscreen mode
output
got data some
got data data
got data to
got data read
finished reading
Enter fullscreen mode Exit fullscreen mode

Writable Stream

A writable stream could be used to write a file, write data to an HTTP response, or write to the terminal. The Writable constructor inherits from the Stream constructor which inherits from the EventEmitter constructor, so writable streams are event emitters. To create a writable stream, call the Writable constructor with the new keyword. The options object of the Writable constructor can have a write function, which takes three arguments chunk, enc, next. To send data to a writable stream use the write method. The end method can be used write a final payload to the stream before ending it. Once the stream is ended the finish event is emitted.

'use strict'
const { Writable } = require('stream')
const createWriteStream = (data) => {
  return new Writable({
    decodeStrings: false,
    write (chunk, enc, next) {
      data.push(chunk);
      next();
    }
  })
}
const data = [];
const writable = createWriteStream(data);
writable.on('finish', () => {console.log('finished', data)});
writable.write('A');
writable.write('B');
writable.write('C');
writable.end('nothing more to write');
Enter fullscreen mode Exit fullscreen mode
output
finished [ 'A', 'B', 'C', 'nothing more to write' ]
Enter fullscreen mode Exit fullscreen mode

Duplex Stream

Duplex streams are streams that implement both the Readable and Writable interfaces. A good example of a Duplex stream would be a TCP socket. TCP socket can read data from a client connection as well as write back to the client. To demonstrate an example I'll create two files to simulate a TCP socket and client connection.

TCP server
'use strict'
const net = require('net')
net.createServer((socket) => {
  const interval = setInterval(() => {
    socket.write('beat')
  }, 1000)
  socket.on('data', (data) => {
    socket.write(data.toString().toUpperCase())
  })
  socket.on('end', () => { clearInterval(interval) })
}).listen(3000)
Enter fullscreen mode Exit fullscreen mode
client connection
'use strict'
const net = require('net')
const socket = net.connect(3000)

socket.on('data', (data) => {
  console.log('got data:', data.toString())
})

setTimeout(() => {
  socket.write('all done')
  setTimeout(() => {
    socket.end()
  }, 250)
}, 3250)
Enter fullscreen mode Exit fullscreen mode

Running both scripts will produce the following output:

output
got data: beat
got data: beat
got data: beat
got data: ALL DONE
Enter fullscreen mode Exit fullscreen mode

Transform Stream

Transform streams are duplex streams with an additional constraint applied to enforce the relationship between the read and write interfaces. The constraint between the read and write interfaces is enforced through the transform function. The transform function has the same signature as the write function from the Writable stream object in that it takes chunk,enc, and next as parameters. The difference is the next function can be passed a second argument which should be the result of applying some kind of transform operation to the incoming chunk. Lets see a quick example.

code snippet
'use strict'
const { Transform } = require('stream')
const createTransformStream = () => {
  return new Transform({
    decodeStrings: false,
    encoding: 'utf8',
    transform (chunk, enc, next) {
     next(null, chunk.toUpperCase());
    }
  })
}
const transform = createTransformStream()
transform.on('data', (data) => {
  console.log('got data:', data);
})
transform.write('a\n');
transform.write('b\n');
transform.write('c\n');
transform.end('nothing more to write');
Enter fullscreen mode Exit fullscreen mode
output
got data: A

got data: B

got data: C

got data: NOTHING MORE TO WRITE
Enter fullscreen mode Exit fullscreen mode

Piping Streams

As stated before, the pipe method takes the source of one stream and pipes it into the destination of another stream. Lets see a quick example. I'll refactor the Readable and Writable examples from the previous sections so they work together using the pipe method.

'use strict'
const { Readable, Writable } = require('stream')
const createReadStream = () => {
  const readData = ['some', 'data', 'to', 'read'];
  return new Readable({
    encoding: 'utf8',
    read () {
      if (readData.length === 0) this.push(null)
      else this.push(readData.shift())
    }
  })
}

const createWriteStream = (data) => {
  return new Writable({
    decodeStrings: false,
    write (chunk, enc, next) {
      data.push(chunk);
      next();
    }
  })
}
const data = [];
const readable = createReadStream();
const writable = createWriteStream(data);
readable.pipe(writable);
writable.on('finish', () => {console.log('finished', data)});
Enter fullscreen mode Exit fullscreen mode
output
finished [ 'some', 'data', 'to', 'read' ]
Enter fullscreen mode Exit fullscreen mode

In the code snippet above the readable.on method has been removed from the code. This is important because streams are in a paused state by default. The only way to get data flowing is to use the resume method, data event or the pipe method. You can pipe as many streams as you need in order to satisfy your use case, however, it is best practice to use a pipeline if piping more then two streams.

A pipeline can be used to pipe a series of streams together. Let see an example. I'll refactor the code from the Readable, Writable, and Transform sections so they work using a pipeline.

pipeline snippet
'use strict'
const { Readable, Writable, Transform, pipeline } = require('stream')
const createReadStream = () => {
  const readData = ['some', 'data', 'to', 'read'];
  return new Readable({
    encoding: 'utf8',
    read() {
      if (readData.length === 0) this.push(null);
      else this.push(readData.shift());
    }
  })
}

const createTransform = () => {
  return new Transform({
    transform(chunk, enc, next) {
      const changedData = chunk.toString().toUpperCase();
      next(null, changedData);
    }
  })
}

const createWriteStream = () => {
  const data = [];
  const writable = new Writable({
    decodeStrings: false,
    write(chunk, enc, next) {
      data.push(chunk.toString());
      next();
    }
  });
  writable.data = data;
  return writable;
}

const readable = createReadStream();
const writable = createWriteStream();
const transform = createTransform();
pipeline(readable, transform, writable, (err) => {
  if (err) console.error('Pipeline failed.', err);
  else console.log('Pipeline succeeded.', writable.data);
});
Enter fullscreen mode Exit fullscreen mode
output
Pipeline succeeded. [ 'SOME', 'DATA', 'TO', 'READ' ]
Enter fullscreen mode Exit fullscreen mode

In the above code snippet, I imported the pipeline function from the stream module. Next, I used three variables, which reference streaming functions, to interact with the data flowing through the pipeline. Finally, the pipeline takes a callback with an err parameter that will execute once the pipeline completes. If an error occurs the pipeline will fail, otherwise the console will log the data with a success message.

There are much more to streams then the examples in this article. I always suggest using the NodeJS Docs as your primary source when developing a solution for your use case. There is also a lot of good articles written by other developers here on DEV that can help you when developing with streams in Node.js. Another good author is Samer Buna. Samer has a lot good content on advanced topics about Node.js. As always if you have any questions post it in the discussions and I will reply. Take care and happy coding.

Discussion (0)