DEV Community

Manoj Kumar Patra
Manoj Kumar Patra

Posted on

Coding with Streams

# Buffer vs stream

With buffer mode, for an input operation, all the data coming from a resource spread over time will be collected into a buffer until the operation is completed. It is then passed back to the caller as one single blob of data.

With streams, the data is processed as soon as it arrives from the resource.

Streams are more efficient both in terms of space (memory usage) and time (computation clock time).

Spatial efficiency

Buffers in V8 have limited size. To find out the maximum size of a buffer:


import buffer from "buffer";
console.log(buffer.constants.MAX_LENGTH);

Enter fullscreen mode Exit fullscreen mode

Trying to read a file that is bigger than the maximum allowed buffer size will result in the following error:


RangeError [ERR_FS_FILE_TOO_LARGE]: File size (8130792448) is greater
than possible Buffer: 2147483647 bytes

Enter fullscreen mode Exit fullscreen mode

Example:

Gzip using buffers


import { promises as fs } from 'fs';
import { gzip } from 'zlib';
import { promisify } from 'util';

const gzipPromise = promisify(gzip);

const filename = process.argv[2];

async function main () {
  const data = await fs.readFile(filename);
  const gzippedData = await gzipPromise(data);
  await fs.writeFile(`${filename}.gz`, gzippedData);
  console.log('File successfully compressed');
}

main();

Enter fullscreen mode Exit fullscreen mode

Gzip using streams - This code runs smoothly against files of any size and with constant memory utilization.


import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

const filename = process.argv[2];

createReadStream(filename)
  .pipe(createGzip())
  .pipe(createWriteStream(`${filename}.gz`))
  .on('finish', () => console.log('File successfully compressed'));

Enter fullscreen mode Exit fullscreen mode

Time efficiency

Consider an application where the client compresses a file and uploads it to a remote HTTP server, which, in turn, decompresses it and saves it on the filesystem.

Implementing this using streams:

// gzip-receive.js

import { createServer } from 'http';
import { createWriteStream } from 'fs';
import { createGunzip } from 'zlib';
import { basename, join } from 'path';
import { createDecipheriv, randomBytes } from 'crypto';

const server = createServer((req, res) => {
  // basename is used to remove any possible path from the name of the received file
  const filename = basename(req.headers['X-Filename']);
  const iv = Buffer.from(
    req.headers['X-Initialization-Vector'], 'hex'
  );
  const destFilename = join('received_files', filename);
  console.log(`File request received: ${filename}`);

  // req is a stream object
  req.pipe(createDecipheriv('aes192', secret, iv))
    .pipe(createGunzip())
    .pipe(createWriteStream(destFilename))
    .on('finish', () => {
      res.writeHead(201, { 'Content-Type': 'text/plain' });
      res.end('OK\n');
      console.log(`File saved: ${destFilename}`);
    });
});

server.listen(3000, () => console.log('Listening on http://localhost:3000'));

Enter fullscreen mode Exit fullscreen mode
// gzip-send.js

import { request } from 'http';
import { createGzip } from 'zlib';
import { createReadStream } from 'fs';
import { basename } from 'path';
import { createCipheriv, randomBytes } from  'crypto';

const filename = process.argv[2];
const serverHost = process.argv[3];
const secret = Buffer.from(process.argv[4], 'hex');
const iv = randomBytes(16);

const httpRequestOptions = {
  hostname: serverHost,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip',
    'X-Filename': basename(filename),
    'X-Initialization-Vector': iv.toString('hex')
  },
};

const req = request(httpRequestOptions, (res) => {
  console.log(`Server response: ${res.statusCode}`);
});

// using streams to read the data from the file, 
// and then compressing and sending each chunk as soon as it is read from the filesystem.
createReadStream(filename)
  .pipe(createGzip())
  .pipe(createCipheriv('aes192', secret, iv))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

Enter fullscreen mode Exit fullscreen mode

Using streams, when first chunk is received, it continues through the write process. When the next chunk is received, it continues through the write process as well in parallel as these tasks are asynchronous. Node.js streams take care of the order in which chunks are received. Thus, the entire process takes less time to complete.

# Stream basics

Every stream in Node.js is an implementation of one of the four base abstract classes available in the stream core module:

  • Readable
  • Writable
  • Duplex
  • Transform

Each stream class is also an instance of EventEmitter.

Streams support two operating modes:

  1. Binary mode: To stream data in the form of chunks, such as buffers or strings
  2. Object mode: To stream data as a sequence of discrete objects

# Readable streams

Reading from a stream

Non-flowing mode

This is the default operating mode for streams.

readable.read([size])


process.stdin
  .on("readable", () => {
    let chunk;
    console.log("New data available");
    // read the data continuously until the internal buffer is emptied
    // synchronous operation
    // chunk is a Buffer object if the stream is working in binary mode
    while((chunk = process.stdin.read()) !== null) {
      console.log(`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`)
    }
  })
  .on("end", () => {
    console.log("End of stream");
  })

Enter fullscreen mode Exit fullscreen mode

To read strings from the stream, call chunk.setEncoding(encoding)

Flowing mode


process.stdin
  .on("data", (chunk) => {
    console.log("New data available");
    console.log(
      `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
    )
  })
  // stop the stream from emitting data events
  // any incoming data to be cached is sent in the internal buffer until paused
  // switches to non-flowing mode
  .pause()
  // switches to flowing mode
  .resume()
  .on("end", () => {
    console.log("End of stream");
  });

Enter fullscreen mode Exit fullscreen mode
Async iterators

async function main () {
  for await (const chunk of process.stdin) {
    console.log('New data available');
    console.log(
      `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
    );
  }
  console.log('End of stream');
}
main();

Enter fullscreen mode Exit fullscreen mode

Readable stream implementation


import { Readable } from "stream";
import Chance from "chance";

const chance = new Chance();

export class RandomStream extends Readable {
  constructor(options) {
    super(options);
    this.emittedBytes = 0;
  }

  _read (size) {
    const chunk = chance.string({ length: size });
    this.push(chunk, "utf-8");
    this.emittedBytes += chunk.length;
    if (chance.bool({ likelihood: 5 })) {
      // End the stream
      this.push(null);
    }
  }
}

const randomStream = new RandomStream();
randomStream
  .on("data", (chunk) => {
    console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`);
  })
  .on("end", () => {
    console.log(`Produced ${randomStream.emittedBytes} bytes of random data`);
  });

Enter fullscreen mode Exit fullscreen mode

interface StreamOptions<T extends Stream> extends Abortable {
  emitClose?: boolean | undefined;
  // The upper limit of the data stored in the internal buffer, after which no more reading from the source should be done
  // Defaults to 16KB
  highWaterMark?: number | undefined;
  // A flag to enable object mode -> defaults to false
  objectMode?: boolean | undefined;
  construct?(this: T, callback: (error?: Error | null) => void): void;
  destroy?(this: T, error: Error | null, callback: (error: Error | null) => void): void;
  autoDestroy?: boolean | undefined;
}

interface ReadableOptions extends StreamOptions<Readable> {
  // Used to convert buffers into strings -> defaults to null
  encoding?: BufferEncoding | undefined;
  read?(this: Readable, size: number): void;
}

Enter fullscreen mode Exit fullscreen mode

Readable.from() -> to create Readable streams from arrays or other iterable objects (generators, iterators and async iterators).

Simplified readable

import { Readable } from 'stream';
import Chance from 'chance';
const chance = new Chance();

let emittedBytes = 0;
const randomStream = new Readable({
  read (size) {
    const chunk = chance.string({ length: size })
    this.push(chunk, 'utf8')
    emittedBytes += chunk.length
    if (chance.bool({ likelihood: 5 })) {
      this.push(null)
    }
  }
});

Enter fullscreen mode Exit fullscreen mode

# Writable streams

writable.write(chunk, [encoding], [callback])

writable.end([chunk], [encoding], [callback]) -> here, the callback function is equivalent to registering a listener to the finish event, which is fired when all the data written in the stream has been flushed into the underlying resource.


import { createServer } from "http";
import Chance from "chance";

const chance = new Chance();
const server = createServer((req, res) => {
  res.writeHead(200, { 'Content-Type': 'text/plain' });
  while (chance.bool({ likelihood: 95 })) {
    res.write(`${chance.string()}\n`);
  }
  res.end("\n\n");
  res.on("finish", () => {
    console.log("All data sent");
  });
});

server.listen(8080, () => {
  console.log("listening on http://localhost:8080");
});

Enter fullscreen mode Exit fullscreen mode

Backpressure

When the size of the internal buffer exceeds highWatermark, writable.write() returns false. Once the buffer is empty, the drain event is emitted, communicating that it's safe to start writing again.


const { createServer } = require("http");
const Chance = require("chance");

const chance = new Chance();

const server = createServer((req, res) => {
  res.writeHead(200, { 'Content-Type': 'text/plain' });
  function generateMore () {
    while (chance.bool({ likelihood: 95 })) {
      const randomChunk = chance.string({
        length: (16 * 1024) - 1
      });
      const shouldContinue = res.write(`${randomChunk}`);
      if (!shouldContinue) {
        console.log("back-pressure");
        return res.once("drain", generateMore);
      }
    }
    res.end("\n\n");
  }
  generateMore();
  res.on("finish", () => console.log('All data sent'));
});

Enter fullscreen mode Exit fullscreen mode

Writable stream implementation


import { Writable } from "stream";
import { promises as fs } from "fs";
import { dirname, join } from "path";
import mkdirp from "mkdirp-promise";

class ToFileStream extends Writable {
  constructor(options) {
    super({ ...options, objectMode: true });
  }

  _write(chunk, encoding, cb) {
    mkdirp(dirname(chunk.path))
      .then(() => fs.writeFile(chunk.path, chunk.content))
      .then(() => cb())
      .catch(cb);
  }
}

const tfs = new ToFileStream();
tfs.write({
  path: join('files', 'file1.txt'), content: 'Hello' })
tfs.write({
  path: join('files', 'file2.txt'), content: 'Node.js' })
tfs.write({
  path: join('files', 'file3.txt'), content: 'streams' })
tfs.end(() => console.log('All files created'))

Enter fullscreen mode Exit fullscreen mode


`

`ts

interface WritableOptions extends StreamOptions {
decodeStrings?: boolean | undefined;
defaultEncoding?: BufferEncoding | undefined;
write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
writev?(
this: Writable,
chunks: Array<{
chunk: any;
encoding: BufferEncoding;
}>,
callback: (error?: Error | null) => void
): void;
final?(this: Writable, callback: (error?: Error | null) => void): void;
}

`

Simplified writable

`js

const tfs = new Writable({
objectMode: true,
write (chunk, encoding, cb) {
mkdirp(dirname(chunk.path))
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => cb())
.catch(cb)
};
});

`

# Duplex streams

A Duplex stream is a stream that is both Readable and Writable.

  • Used for an entity that is both a data source and a data destination
  • Inherit both from stream.Readable and stream.Writable
  • Both read() and write() methods are available
  • Can listen on both readable and drain events
  • To build a Duplex stream, we need to provide both _read() and _write() methods

Options of importance:

  1. allowHalfOpen -> This option defaults to true, if set to false will cause both parts of the stream to end if only one of them does.
  2. readableObjectMode
  3. writableObjectMode

# Transform streams

Transform streams are a special kind of Duplex stream that are specifically designed to handle data transformations.

Examples: zlib.createGzip(), crypto.createCipheriv()

To build a Transform stream, we need to provide the following methods:

  1. _read()
  2. _write()
  3. _transform()
  4. _flush()

Transform stream implementation

Implement a Transform stream that replaces all the occurrences of a given string:

`js

import { Transform } from "stream";

export class ReplaceStream extends Transform {
constructor(searchStr, replaceStr, options) {
super({ ...options });
this.searchStr = searchStr;
this.replaceStr = replaceStr;
this.tail = "";
}

_transform(chunk, encoding, callback) {
const pieces = (this.tail + chunk).split(this.searchStr);
const lastPiece = pieces[pieces.length - 1];
const tailLen = this.searchStr.length - 1;
this.tail = lastPiece.slice(-tailLen);
pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen);
this.push(pieces.join(this.replaceStr));
callback();
}

_flush(callback) {
this.push(this.tail);
callback();
}
}

const replaceStream = new ReplaceStream("
", ":)");
replaceStream.on("data", (chunk) => {
console.log(chunk.toString());
});
replaceStream.write("Hello
Wo");
replaceStream.write("rld
");
replaceStream.write("This is a test
");
replaceStream.end();

`

Filtering and aggregating data with Transform streams:

`csv

type,country,profit
Household,Namibia,597290.92
Baby Food,Iceland,808579.10
Meat,Russia,277305.60
Meat,Italy,413270.00
Cereal,Malta,174965.25
Meat,Indonesia,145402.40
Household,Italy,728880.54

`

`js

import { createReadStream } from "fs";
import { parse } from "csv-parse";
import { Transform } from "stream";

class FilterByCountry extends Transform {
constructor(country, options = {}) {
options.objectMode = true;
super(options);
this.country = country;
}

_transform(record, encoding, callback) {
if (record.country === this.country) {
this.push(record);
}
callback();
}
}

class SumProfit extends Transform {
constructor(options = {}) {
options.objectMode = true;
super(options);
this.total = 0;
}

_transform(record, encoding, callback) {
this.total += Number.parseFloat(record.profit);
callback();
}

_flush(callback) {
this.push(this.total.toString());
}
}

const csvParse = parse({ columns: true });

createReadStream("./data.csv")
.pipe(csvParse)
.pipe(new FilterByCountry("Italy"))
.pipe(new SumProfit())
.pipe(process.stdout);

`

# Passthrough Streams

Passthrough stream is a special type of Transform that outputs every data chunk without applying any transformation.

Observability

`js

import { PassThrough } from 'stream';

let bytesWritten = 0;

const monitor = new PassThrough();

monitor.on('data', (chunk) => {
bytesWritten += chunk.length;
});

monitor.on('finish', () => {
console.log(${bytesWritten} bytes written);
});

monitor.write('Hello!');
monitor.end();

createReadStream(filename)
.pipe(createGzip())
.pipe(monitor)
.pipe(createWriteStream(${filename}.gz));

`

Late piping

Use a PassThrough stream when you need to provide a placeholder for data that will be read or written in the future.

`js

import { createReadStream } from 'fs';
import { createBrotliCompress } from 'zlib';
import { PassThrough } from 'stream';
import { basename } from 'path';
import { upload } from './upload.js';

const filepath = process.argv[2];
const filename = basename(filepath);

const contentStream = new PassThrough();

upload(${filename}.br, contentStream)
.then((response) => {
console.log(Server response: ${response.data});
})
.catch((err) => {
console.error(err);
process.exit(1);
});

createReadStream(filepath)
.pipe(createBrotliCompress())
.pipe(contentStream);

`

Alternate implementation:

`js

function createUploadStream (filename) {
const connector = new PassThrough();
upload(filename, connector);
return connector;
}

const upload = createUploadStream('a-file.txt');
upload.write('Hello World');
upload.end();

`

# Lazy streams

❓ Why are lazy streams required?

Creating a stream instance might initialize expensive operations straight away (for example, open a file or a socket, initialize a connection to a database, and so on), even before we actually start to use such a stream. This might not be desirable if we are creating a large number of stream instances for later consumption. For this reason, we should consider lazy implementation of streams.

lazystream

This library allows us to effectively create proxies for actual stream instances, where the proxied instance is not created until some piece of code is actually starting to consume data from the proxy.

Custom implementation

`js

import { Readable, Writable } from 'stream';

class LazyStream {
constructor(streamFactory) {
this.streamFactory = streamFactory;
this.stream = null;
}

_initializeStream() {
if (!this.stream) {
this.stream = this.streamFactory();
}
}

createReadableStream() {
return new Readable({
read: (size) => {
this._initializeStream();
this.stream.on('data', (chunk) => {
this.push(chunk);
});
this.stream.on('end', () => {
this.push(null);
});
},
});
}

createWritableStream() {
return new Writable({
write: (chunk, encoding, callback) => {
this._initializeStream();
this.stream.write(chunk, encoding, callback);
},
});
}
}

module.exports = LazyStream;

// ------------------------------

const fs = require('fs');
const LazyStream = require('./LazyStream');

const createReadStream = () => {
return fs.createReadStream('path/to/file.txt');
};

const lazyReadStream = new LazyStream(createReadStream);

const readable = lazyReadStream.createReadableStream();
readable.pipe(process.stdout);

const createWriteStream = () => {
return fs.createWriteStream('path/to/output.txt');
};

const lazyWriteStream = new LazyStream(createWriteStream);

const writable = lazyWriteStream.createWritableStream();
process.stdin.pipe(writable);

`

# Pipes and error handling

The following implementation allows us to ensure that all the allocated resources are properly released, and the error is handled gracefully:

`js

function handleError (err) {
console.error(err);
stream1.destroy();
stream2.destroy();
}

stream1
.on('error', handleError)
.pipe(stream2)
.on('error', handleError);

`

Error handling with pipeline

pipeline(stream1, stream2, stream3, ... , cb)

All the streams are properly destroyed when the pipeline completes successfully or when it's interrupted by an error.

Example:

`js

import { createGzip, createGunzip } from "zlib";
import { Transform, pipeline } from "stream";

const uppercasify = new Transform({
transform (chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});

pipeline(
process.stdin,
createGunzip(),
uppercasify,
createGzip(),
process.stdout,
(err) => {
if (err) {
console.error(err);
process.exit(1);
}
},
);

`

# Asynchronous control flow patterns with streams

Sequential execution

`js

import { createWriteStream, createReadStream } from "fs";
import { Readable, Transform } from "stream";

function concatFiles(dest, files) {
return new Promise((resolve, reject) => {
const destStream = createWriteStream(dest);
Readable.from(files)
.pipe(new Transform({
objectMode: true,
transform: (chunk, _, done) => {
const src = createReadStream(chunk);
src.pipe(destStream, { end: false });
src.on("error", done);
src.on("end", done);
},
}))
.on("error", reject)
.on("finish", () => {
destStream.end();
resolve();
});
});
}

`

Unordered parallel execution

`js

import { Transform } from "stream";

class ParallelStream extends Transform {
constructor (userTransform, opts) {
super({ objectMode: true, ...opts });
this.userTransform = userTransform;
this.running = 0;
this.terminateCb = null;
}

_transform(chunk, enc, done) {
this.running++;
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this),
);
done();
}

_flush(done) {
if (this.running > 0) {
this.terminateCb = done;
} else {
done();
}
}

_onComplete (err) {
this.running--;
if (err) {
return this.on("error", err);
}
if (this.running === 0) {
this.terminateCb && this.terminateCb();
}
}
}

// -----------------------------------------

import { pipeline } from "stream";
import { createReadStream, createWriteStream } from "fs";
import split from "split";
import superagent from "superagent";

pipeline(
createReadStream(process.argv[2]),
split(),
new ParallelStream(
async (url, enc, push, done) => {
if (!url) {
return done();
}
try {
await superagent.head(url, { timeout: 5 * 1000 })
push(${url} is up\n);
} catch (err) {
push(${url} is down\n);
}
done();
},
),
createWriteStream('results.txt'),
(err) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log('All urls have been checked');
}
)

`

Unordered limited parallel execution

`js

class LimitedParallelStream extends Transform {
constructor (concurrency, userTransform, opts) {
super({ objectMode: true, ...opts });
this.userTransform = userTransform;
this.concurrency = concurrency;
this.running = 0;
this.continueCb = null;
this.terminateCb = null;
}

_transform(chunk, enc, done) {
this.running++;
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this),
);
if (this.running < this.concurrency) {
done();
} else {
this.continueCb = done;
}
}

_flush(done) {
if (this.running > 0) {
this.terminateCb = done;
} else {
done();
}
}

_onComplete (err) {
this.running--;
if (err) {
return this.on("error", err);
}
const tmpCb = this.continueCb;
this.continueCb = null;
tmpCb && tmpCb();
if (this.running === 0) {
this.terminateCb && this.terminateCb();
}
}
}

`

Ordered parallel execution

parallel-transform - allows us to run our transforms in parallel without changing the order of the output.

parallel-transform implementation opts for predictable memory utilization and maintains an internal buffer that will not grow more than the specified maximum concurrency.

`js

import { pipeline } from "stream";
import { createReadStream, createWriteStream } from "fs";
import split from "split";
import parallelTransform from 'parallel-transform';

pipeline(
createReadStream(process.argv[2]),
split(),
parallelTransform(4, async function (url, done) {
if (!url) {
return done();
}
console.log(url);
try {
await request.head(url, { timeout: 5 * 1000 });
this.push(${url} is up\n);
} catch (err) {
this.push(${url} is down\n);
}
done();
}),
createWriteStream('results.txt'),
(err) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log('All urls have been checked');
}
);

`

# Piping patterns

Combining streams

Advantages of a combined stream:

  1. We can redistribute it as a black box by hiding its internal pipeline.
  2. We have simplified error management, as we don't have to attach an error listener to each stream in the pipeline, but just to the combined stream itself.

const combinedStream = pumpify(streamA, streamB, streamC)
- pumpify

`js

import { createGzip, createGunzip } from "zlib";
import {
createCipheriv,
createDecipheriv,
scryptSync,
randomBytes,
} from "crypto";
import { Transform } from "stream";
import pumpify from "pumpify";

function createPassword(password) {
return scryptSync(password, "salt", 24);
}

function generateIV() {
return randomBytes(16);
}

class PrependIV extends Transform {
constructor(iv, options) {
super(options);
this.iv = iv;
this.ivPrepended = false;
}

_transform(chunk, _, done) {
if (!this.ivPrepended) {
this.push(iv);
this.ivPrepended = true;
}
this.push(chunk);
done();
}
}

function createCompressAndEncrypt(password) {
const key = createKey(password);
const iv = generateIV();
const prependedStream = new PrependIV(iv);
const combinedStream = pumpify(
prependedStream,
createGzip(),
createCipheriv("aes192", key, iv)
);
return combinedStream;
}

function createDecryptAndDecompress(password) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, _, done) {
if (!this.iv) {
this.iv = chunk.slice(0, 16);
this.cipher = createDecipheriv(
"aes192",
createPassword(password),
this.iv
);
this.push(chunk.slice(16));
} else {
this.push(chunk);
}
done();
},
flush(done) {
this.push(null);
done();
},
}).pipe(pumpify(createGunzip));
}

`

`js

// COMPRESS

const [,, password, source] = process.argv;
const destination = ${source}.gz.enc;

pipeline(
createReadStream(source),
createCompressAndEncrypt(password),
createWriteStream(destination),
(err) => {
if (err) {
console.error(err)
process.exit(1)
}
console.log(${destination} created)
}
);

`

`js

// DECOMPRESS

const [,, password, source] = process.argv;
const destination = source.replace('.gz.enc', '.out');

pipeline(
createReadStream(source),
createDecryptAndDecompress(password),
createWriteStream(destination),
(err) => {
if (err) {
console.error(err)
process.exit(1)
}
console.log(${destination} created)
}
);

`

Forking streams

This is useful:

  • when we want to send the same data to different destinations
  • when we want to perform different transformations on the same data
  • when we want to split the data based on some criteria

`js

import { createReadStream, createWriteStream } from 'fs';
import { createHash } from 'crypto';

const filename = process.argv[2];
const sha1Stream = createHash('sha1').setEncoding('hex');
const md5Stream = createHash('md5').setEncoding('hex');
const inputStream = createReadStream(filename);

inputStream
.pipe(sha1Stream)
.pipe(createWriteStream(${filename}.sha1));

inputStream
.pipe(md5Stream)
.pipe(createWriteStream(${filename}.md5));

`

  • Both streams will end automatically unless { end: false } is specified.
  • The two forks of the stream will receive the same data chunks.
  • If one destination pauses the source stream to handle backpressure for a long time, all the other destinations will be waiting as well.
  • If we pipe to an additional stream after we've started consuming the data at source (async piping), the new stream will only receive new chunks of data.

Merging streams

  • Use the option { end: false } when piping multiple sources to a single destination and then invoke end() on the destination only when all the sources have completed reading.

`js

import { createReadStream, createWriteStream } from 'fs';
import split from 'split';

const dest = process.argv[2];
const sources = process.argv.slice(3);

const destStream = createWriteStream(dest);

let endCount = 0;
for (const source of sources) {
const sourceStream = createReadStream(source, { highWaterMark: 16 });
sourceStream.on("end", () => {
if (++endCount === sources.length) {
destStream.end();
console.log(${dest} created);
}
});
sourceStream
.pipe(split(line => line + "\n"))
.pipe(destStream, { end: false });
}

`

To merge streams in order, we can modify the code as follows:

`js

import { createReadStream, createWriteStream } from 'fs';
import split from 'split';

const dest = process.argv[2];
const sources = process.argv.slice(3);

const destStream = createWriteStream(dest);

function processStream(index) {
if (index >= sources.length) {
destStream.end();
console.log(${dest} created);
return;
}

const sourceStream = createReadStream(sources[index], { highWaterMark: 16 });
sourceStream.on("end", () => processStream(index + 1));
sourceStream
.pipe(split(line => line + "\n"))
.pipe(destStream, { end: false })
}

processStream(0);

`

multistream - package that deals with merging streams in order.

Multiplexing and demultiplexing

The operation of combining multiple streams (in this case, also known as channels) to allow transmission over a single stream is called multiplexing. The process of reconstructing the original streams from the data received from a shared stream, is called demultiplexing. The devices that perform these operations are called multiplexer (or mux) and demultiplexer (or demux), respectively.

Multiplexing

`js

import { fork } from "child_process";
import { connect } from "net";

function multiplexChannels(sources, destination) {
let openChannels = sources.length;
for (let i = 0; i < sources.length; i++) {
sources[i]
.on("readable", function () {
let chunk;
while ((chunk = this.read()) !== null) {
const outBuffer = Buffer.alloc(1 + 4 + chunk.length);
outBuffer.writeUint8(i, 0);
outBuffer.writeUInt32BE(chunk.length, 1);
chunk.copy(outBuffer, 5);
console.log(Sending packet to channel: ${i})
destination.write(outBuffer);
}
})
.on("end", () => {
if (--openChannels === 0) {
destination.end();
}
});
}
}

// Create a TCP client connection to localhost:3000
const socket = connect(3000, () => {
const child = fork(
process.argv[2],
process.argv.slice(3),
// ensures child process does not inherit stdout and stderr of the parent
{ silent: true }
);
multiplexChannels([child.stdout, child.stderr], socket);
})

`

Demultiplexing

`js

import { createWriteStream } from "fs";
import { createServer } from "net";

function demultiplexChannel(source, destinations) {
let currentChannel = null;
let currentLength = null;

source
.on("readable", () => {
let chunk;
if (currentChannel === null) {
chunk = source.read(1);
currentChannel = chunk && chunk.readUInt8(0);
}
if (currentLength === null) {
chunk = source.read(4);
currentLength = chunk && chunk.readUInt32BE(0);
if (currentLength === null) {
return null;
}
chunk = source.read(currentLength);
if (chunk === null) {
return null;
}

    console.log(`Received packet from: ${currentChannel}`);
    destinations[currentChannel].write(chunk);
    currentChannel = null;
    currentLength = null;
  }
})
.on("end", () => {
  destinations.forEach(destination => {
    destination.end();
  });
  console.log("Source channel closed");
});
Enter fullscreen mode Exit fullscreen mode

}

const server = createServer(socket => {
const stdoutStream = createWriteStream('stdout.log');
const stderrStream = createWriteStream('stderr.log');
demultiplexChannel(socket, [stdoutStream, stderrStream]);
});

server.listen(3000, () => console.log("Server started"));

`

ternary-stream - implement an if...else statement for streams.

Top comments (0)