# Buffer vs stream
With buffer mode, for an input operation, all the data coming from a resource spread over time will be collected into a buffer until the operation is completed. It is then passed back to the caller as one single blob of data.
With streams, the data is processed as soon as it arrives from the resource.
Streams are more efficient both in terms of space (memory usage) and time (computation clock time).
Spatial efficiency
Buffers in V8 have limited size. To find out the maximum size of a buffer:
import buffer from "buffer";
console.log(buffer.constants.MAX_LENGTH);
Trying to read a file that is bigger than the maximum allowed buffer size will result in the following error:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (8130792448) is greater
than possible Buffer: 2147483647 bytes
Example:
Gzip using buffers
import { promises as fs } from 'fs';
import { gzip } from 'zlib';
import { promisify } from 'util';
const gzipPromise = promisify(gzip);
const filename = process.argv[2];
async function main () {
const data = await fs.readFile(filename);
const gzippedData = await gzipPromise(data);
await fs.writeFile(`${filename}.gz`, gzippedData);
console.log('File successfully compressed');
}
main();
Gzip using streams - This code runs smoothly against files of any size and with constant memory utilization.
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
const filename = process.argv[2];
createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'));
Time efficiency
Consider an application where the client compresses a file and uploads it to a remote HTTP server, which, in turn, decompresses it and saves it on the filesystem.
Implementing this using streams:
// gzip-receive.js
import { createServer } from 'http';
import { createWriteStream } from 'fs';
import { createGunzip } from 'zlib';
import { basename, join } from 'path';
import { createDecipheriv, randomBytes } from 'crypto';
const server = createServer((req, res) => {
// basename is used to remove any possible path from the name of the received file
const filename = basename(req.headers['X-Filename']);
const iv = Buffer.from(
req.headers['X-Initialization-Vector'], 'hex'
);
const destFilename = join('received_files', filename);
console.log(`File request received: ${filename}`);
// req is a stream object
req.pipe(createDecipheriv('aes192', secret, iv))
.pipe(createGunzip())
.pipe(createWriteStream(destFilename))
.on('finish', () => {
res.writeHead(201, { 'Content-Type': 'text/plain' });
res.end('OK\n');
console.log(`File saved: ${destFilename}`);
});
});
server.listen(3000, () => console.log('Listening on http://localhost:3000'));
// gzip-send.js
import { request } from 'http';
import { createGzip } from 'zlib';
import { createReadStream } from 'fs';
import { basename } from 'path';
import { createCipheriv, randomBytes } from 'crypto';
const filename = process.argv[2];
const serverHost = process.argv[3];
const secret = Buffer.from(process.argv[4], 'hex');
const iv = randomBytes(16);
const httpRequestOptions = {
hostname: serverHost,
port: 3000,
path: '/',
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip',
'X-Filename': basename(filename),
'X-Initialization-Vector': iv.toString('hex')
},
};
const req = request(httpRequestOptions, (res) => {
console.log(`Server response: ${res.statusCode}`);
});
// using streams to read the data from the file,
// and then compressing and sending each chunk as soon as it is read from the filesystem.
createReadStream(filename)
.pipe(createGzip())
.pipe(createCipheriv('aes192', secret, iv))
.pipe(req)
.on('finish', () => {
console.log('File successfully sent');
});
Using streams, when first chunk is received, it continues through the write process. When the next chunk is received, it continues through the write process as well in parallel as these tasks are asynchronous. Node.js streams take care of the order in which chunks are received. Thus, the entire process takes less time to complete.
# Stream basics
Every stream in Node.js is an implementation of one of the four base abstract classes available in the stream
core module:
Readable
Writable
Duplex
Transform
Each stream class is also an instance of
EventEmitter
.
Streams support two operating modes:
- Binary mode: To stream data in the form of chunks, such as buffers or strings
- Object mode: To stream data as a sequence of discrete objects
# Readable streams
Reading from a stream
Non-flowing mode
This is the default operating mode for streams.
readable.read([size])
process.stdin
.on("readable", () => {
let chunk;
console.log("New data available");
// read the data continuously until the internal buffer is emptied
// synchronous operation
// chunk is a Buffer object if the stream is working in binary mode
while((chunk = process.stdin.read()) !== null) {
console.log(`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`)
}
})
.on("end", () => {
console.log("End of stream");
})
To read strings from the stream, call
chunk.setEncoding(encoding)
Flowing mode
process.stdin
.on("data", (chunk) => {
console.log("New data available");
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
})
// stop the stream from emitting data events
// any incoming data to be cached is sent in the internal buffer until paused
// switches to non-flowing mode
.pause()
// switches to flowing mode
.resume()
.on("end", () => {
console.log("End of stream");
});
Async iterators
async function main () {
for await (const chunk of process.stdin) {
console.log('New data available');
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
);
}
console.log('End of stream');
}
main();
Readable stream implementation
import { Readable } from "stream";
import Chance from "chance";
const chance = new Chance();
export class RandomStream extends Readable {
constructor(options) {
super(options);
this.emittedBytes = 0;
}
_read (size) {
const chunk = chance.string({ length: size });
this.push(chunk, "utf-8");
this.emittedBytes += chunk.length;
if (chance.bool({ likelihood: 5 })) {
// End the stream
this.push(null);
}
}
}
const randomStream = new RandomStream();
randomStream
.on("data", (chunk) => {
console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`);
})
.on("end", () => {
console.log(`Produced ${randomStream.emittedBytes} bytes of random data`);
});
interface StreamOptions<T extends Stream> extends Abortable {
emitClose?: boolean | undefined;
// The upper limit of the data stored in the internal buffer, after which no more reading from the source should be done
// Defaults to 16KB
highWaterMark?: number | undefined;
// A flag to enable object mode -> defaults to false
objectMode?: boolean | undefined;
construct?(this: T, callback: (error?: Error | null) => void): void;
destroy?(this: T, error: Error | null, callback: (error: Error | null) => void): void;
autoDestroy?: boolean | undefined;
}
interface ReadableOptions extends StreamOptions<Readable> {
// Used to convert buffers into strings -> defaults to null
encoding?: BufferEncoding | undefined;
read?(this: Readable, size: number): void;
}
⭐ Readable.from()
-> to create Readable
streams from arrays or other iterable objects (generators, iterators and async iterators).
Simplified readable
import { Readable } from 'stream';
import Chance from 'chance';
const chance = new Chance();
let emittedBytes = 0;
const randomStream = new Readable({
read (size) {
const chunk = chance.string({ length: size })
this.push(chunk, 'utf8')
emittedBytes += chunk.length
if (chance.bool({ likelihood: 5 })) {
this.push(null)
}
}
});
# Writable streams
writable.write(chunk, [encoding], [callback])
writable.end([chunk], [encoding], [callback])
-> here, the callback
function is equivalent to registering a listener to the finish
event, which is fired when all the data written in the stream has been flushed into the underlying resource.
import { createServer } from "http";
import Chance from "chance";
const chance = new Chance();
const server = createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
while (chance.bool({ likelihood: 95 })) {
res.write(`${chance.string()}\n`);
}
res.end("\n\n");
res.on("finish", () => {
console.log("All data sent");
});
});
server.listen(8080, () => {
console.log("listening on http://localhost:8080");
});
Backpressure
When the size of the internal buffer exceeds highWatermark
, writable.write()
returns false
. Once the buffer is empty, the drain
event is emitted, communicating that it's safe to start writing again.
const { createServer } = require("http");
const Chance = require("chance");
const chance = new Chance();
const server = createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
function generateMore () {
while (chance.bool({ likelihood: 95 })) {
const randomChunk = chance.string({
length: (16 * 1024) - 1
});
const shouldContinue = res.write(`${randomChunk}`);
if (!shouldContinue) {
console.log("back-pressure");
return res.once("drain", generateMore);
}
}
res.end("\n\n");
}
generateMore();
res.on("finish", () => console.log('All data sent'));
});
Writable stream implementation
import { Writable } from "stream";
import { promises as fs } from "fs";
import { dirname, join } from "path";
import mkdirp from "mkdirp-promise";
class ToFileStream extends Writable {
constructor(options) {
super({ ...options, objectMode: true });
}
_write(chunk, encoding, cb) {
mkdirp(dirname(chunk.path))
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => cb())
.catch(cb);
}
}
const tfs = new ToFileStream();
tfs.write({
path: join('files', 'file1.txt'), content: 'Hello' })
tfs.write({
path: join('files', 'file2.txt'), content: 'Node.js' })
tfs.write({
path: join('files', 'file3.txt'), content: 'streams' })
tfs.end(() => console.log('All files created'))
`
`ts
interface WritableOptions extends StreamOptions {
decodeStrings?: boolean | undefined;
defaultEncoding?: BufferEncoding | undefined;
write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
writev?(
this: Writable,
chunks: Array<{
chunk: any;
encoding: BufferEncoding;
}>,
callback: (error?: Error | null) => void
): void;
final?(this: Writable, callback: (error?: Error | null) => void): void;
}
`
Simplified writable
`js
const tfs = new Writable({
objectMode: true,
write (chunk, encoding, cb) {
mkdirp(dirname(chunk.path))
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => cb())
.catch(cb)
};
});
`
# Duplex streams
A Duplex stream is a stream that is both
Readable
andWritable
.
- Used for an entity that is both a data source and a data destination
- Inherit both from
stream.Readable
andstream.Writable
- Both
read()
andwrite()
methods are available - Can listen on both
readable
anddrain
events - To build a
Duplex
stream, we need to provide both_read()
and_write()
methods
Options of importance:
-
allowHalfOpen
-> This option defaults totrue
, if set tofalse
will cause both parts of the stream to end if only one of them does. readableObjectMode
writableObjectMode
# Transform streams
Transform streams are a special kind of
Duplex
stream that are specifically designed to handle data transformations.
Examples: zlib.createGzip()
, crypto.createCipheriv()
To build a Transform
stream, we need to provide the following methods:
_read()
_write()
_transform()
_flush()
Transform stream implementation
Implement a Transform stream that replaces all the occurrences of a given string:
`js
import { Transform } from "stream";
export class ReplaceStream extends Transform {
constructor(searchStr, replaceStr, options) {
super({ ...options });
this.searchStr = searchStr;
this.replaceStr = replaceStr;
this.tail = "";
}
_transform(chunk, encoding, callback) {
const pieces = (this.tail + chunk).split(this.searchStr);
const lastPiece = pieces[pieces.length - 1];
const tailLen = this.searchStr.length - 1;
this.tail = lastPiece.slice(-tailLen);
pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen);
this.push(pieces.join(this.replaceStr));
callback();
}
_flush(callback) {
this.push(this.tail);
callback();
}
}
const replaceStream = new ReplaceStream("
", ":)");
replaceStream.on("data", (chunk) => {
console.log(chunk.toString());
});
replaceStream.write("Hello
Wo");
replaceStream.write("rld
");
replaceStream.write("This is a test
");
replaceStream.end();
`
Filtering and aggregating data with Transform
streams:
`csv
type,country,profit
Household,Namibia,597290.92
Baby Food,Iceland,808579.10
Meat,Russia,277305.60
Meat,Italy,413270.00
Cereal,Malta,174965.25
Meat,Indonesia,145402.40
Household,Italy,728880.54
`
`js
import { createReadStream } from "fs";
import { parse } from "csv-parse";
import { Transform } from "stream";
class FilterByCountry extends Transform {
constructor(country, options = {}) {
options.objectMode = true;
super(options);
this.country = country;
}
_transform(record, encoding, callback) {
if (record.country === this.country) {
this.push(record);
}
callback();
}
}
class SumProfit extends Transform {
constructor(options = {}) {
options.objectMode = true;
super(options);
this.total = 0;
}
_transform(record, encoding, callback) {
this.total += Number.parseFloat(record.profit);
callback();
}
_flush(callback) {
this.push(this.total.toString());
}
}
const csvParse = parse({ columns: true });
createReadStream("./data.csv")
.pipe(csvParse)
.pipe(new FilterByCountry("Italy"))
.pipe(new SumProfit())
.pipe(process.stdout);
`
# Passthrough Streams
Passthrough
stream is a special type of Transform that outputs every data chunk without applying any transformation.
Observability
`js
import { PassThrough } from 'stream';
let bytesWritten = 0;
const monitor = new PassThrough();
monitor.on('data', (chunk) => {
bytesWritten += chunk.length;
});
monitor.on('finish', () => {
console.log(${bytesWritten} bytes written
);
});
monitor.write('Hello!');
monitor.end();
createReadStream(filename)
.pipe(createGzip())
.pipe(monitor)
.pipe(createWriteStream(${filename}.gz
));
`
Late piping
Use a
PassThrough
stream when you need to provide a placeholder for data that will be read or written in the future.
`js
import { createReadStream } from 'fs';
import { createBrotliCompress } from 'zlib';
import { PassThrough } from 'stream';
import { basename } from 'path';
import { upload } from './upload.js';
const filepath = process.argv[2];
const filename = basename(filepath);
const contentStream = new PassThrough();
upload(${filename}.br
, contentStream)
.then((response) => {
console.log(Server response: ${response.data}
);
})
.catch((err) => {
console.error(err);
process.exit(1);
});
createReadStream(filepath)
.pipe(createBrotliCompress())
.pipe(contentStream);
`
Alternate implementation:
`js
function createUploadStream (filename) {
const connector = new PassThrough();
upload(filename, connector);
return connector;
}
const upload = createUploadStream('a-file.txt');
upload.write('Hello World');
upload.end();
`
# Lazy streams
❓ Why are lazy streams required?
Creating a stream instance might initialize expensive operations straight away (for example, open a file or a socket, initialize a connection to a database, and so on), even before we actually start to use such a stream. This might not be desirable if we are creating a large number of stream instances for later consumption. For this reason, we should consider lazy implementation of streams.
This library allows us to effectively create proxies for actual stream instances, where the proxied instance is not created until some piece of code is actually starting to consume data from the proxy.
Custom implementation
`js
import { Readable, Writable } from 'stream';
class LazyStream {
constructor(streamFactory) {
this.streamFactory = streamFactory;
this.stream = null;
}
_initializeStream() {
if (!this.stream) {
this.stream = this.streamFactory();
}
}
createReadableStream() {
return new Readable({
read: (size) => {
this._initializeStream();
this.stream.on('data', (chunk) => {
this.push(chunk);
});
this.stream.on('end', () => {
this.push(null);
});
},
});
}
createWritableStream() {
return new Writable({
write: (chunk, encoding, callback) => {
this._initializeStream();
this.stream.write(chunk, encoding, callback);
},
});
}
}
module.exports = LazyStream;
// ------------------------------
const fs = require('fs');
const LazyStream = require('./LazyStream');
const createReadStream = () => {
return fs.createReadStream('path/to/file.txt');
};
const lazyReadStream = new LazyStream(createReadStream);
const readable = lazyReadStream.createReadableStream();
readable.pipe(process.stdout);
const createWriteStream = () => {
return fs.createWriteStream('path/to/output.txt');
};
const lazyWriteStream = new LazyStream(createWriteStream);
const writable = lazyWriteStream.createWritableStream();
process.stdin.pipe(writable);
`
# Pipes and error handling
The following implementation allows us to ensure that all the allocated resources are properly released, and the error is handled gracefully:
`js
function handleError (err) {
console.error(err);
stream1.destroy();
stream2.destroy();
}
stream1
.on('error', handleError)
.pipe(stream2)
.on('error', handleError);
`
Error handling with pipeline
pipeline(stream1, stream2, stream3, ... , cb)
All the streams are properly destroyed when the pipeline
completes successfully or when it's interrupted by an error.
Example:
`js
import { createGzip, createGunzip } from "zlib";
import { Transform, pipeline } from "stream";
const uppercasify = new Transform({
transform (chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});
pipeline(
process.stdin,
createGunzip(),
uppercasify,
createGzip(),
process.stdout,
(err) => {
if (err) {
console.error(err);
process.exit(1);
}
},
);
`
# Asynchronous control flow patterns with streams
Sequential execution
`js
import { createWriteStream, createReadStream } from "fs";
import { Readable, Transform } from "stream";
function concatFiles(dest, files) {
return new Promise((resolve, reject) => {
const destStream = createWriteStream(dest);
Readable.from(files)
.pipe(new Transform({
objectMode: true,
transform: (chunk, _, done) => {
const src = createReadStream(chunk);
src.pipe(destStream, { end: false });
src.on("error", done);
src.on("end", done);
},
}))
.on("error", reject)
.on("finish", () => {
destStream.end();
resolve();
});
});
}
`
Unordered parallel execution
`js
import { Transform } from "stream";
class ParallelStream extends Transform {
constructor (userTransform, opts) {
super({ objectMode: true, ...opts });
this.userTransform = userTransform;
this.running = 0;
this.terminateCb = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this),
);
done();
}
_flush(done) {
if (this.running > 0) {
this.terminateCb = done;
} else {
done();
}
}
_onComplete (err) {
this.running--;
if (err) {
return this.on("error", err);
}
if (this.running === 0) {
this.terminateCb && this.terminateCb();
}
}
}
// -----------------------------------------
import { pipeline } from "stream";
import { createReadStream, createWriteStream } from "fs";
import split from "split";
import superagent from "superagent";
pipeline(
createReadStream(process.argv[2]),
split(),
new ParallelStream(
async (url, enc, push, done) => {
if (!url) {
return done();
}
try {
await superagent.head(url, { timeout: 5 * 1000 })
push(${url} is up\n
);
} catch (err) {
push(${url} is down\n
);
}
done();
},
),
createWriteStream('results.txt'),
(err) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log('All urls have been checked');
}
)
`
Unordered limited parallel execution
`js
class LimitedParallelStream extends Transform {
constructor (concurrency, userTransform, opts) {
super({ objectMode: true, ...opts });
this.userTransform = userTransform;
this.concurrency = concurrency;
this.running = 0;
this.continueCb = null;
this.terminateCb = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this),
);
if (this.running < this.concurrency) {
done();
} else {
this.continueCb = done;
}
}
_flush(done) {
if (this.running > 0) {
this.terminateCb = done;
} else {
done();
}
}
_onComplete (err) {
this.running--;
if (err) {
return this.on("error", err);
}
const tmpCb = this.continueCb;
this.continueCb = null;
tmpCb && tmpCb();
if (this.running === 0) {
this.terminateCb && this.terminateCb();
}
}
}
`
Ordered parallel execution
parallel-transform
- allows us to run our transforms in parallel without changing the order of the output.
parallel-transform
implementation opts for predictable memory utilization and maintains an internal buffer that will not grow more than the specified maximum concurrency.
`js
import { pipeline } from "stream";
import { createReadStream, createWriteStream } from "fs";
import split from "split";
import parallelTransform from 'parallel-transform';
pipeline(
createReadStream(process.argv[2]),
split(),
parallelTransform(4, async function (url, done) {
if (!url) {
return done();
}
console.log(url);
try {
await request.head(url, { timeout: 5 * 1000 });
this.push(${url} is up\n
);
} catch (err) {
this.push(${url} is down\n
);
}
done();
}),
createWriteStream('results.txt'),
(err) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log('All urls have been checked');
}
);
`
# Piping patterns
Combining streams
Advantages of a combined stream:
- We can redistribute it as a black box by hiding its internal pipeline.
- We have simplified error management, as we don't have to attach an error listener to each stream in the pipeline, but just to the combined stream itself.
const combinedStream = pumpify(streamA, streamB, streamC)
-
pumpify
`js
import { createGzip, createGunzip } from "zlib";
import {
createCipheriv,
createDecipheriv,
scryptSync,
randomBytes,
} from "crypto";
import { Transform } from "stream";
import pumpify from "pumpify";
function createPassword(password) {
return scryptSync(password, "salt", 24);
}
function generateIV() {
return randomBytes(16);
}
class PrependIV extends Transform {
constructor(iv, options) {
super(options);
this.iv = iv;
this.ivPrepended = false;
}
_transform(chunk, _, done) {
if (!this.ivPrepended) {
this.push(iv);
this.ivPrepended = true;
}
this.push(chunk);
done();
}
}
function createCompressAndEncrypt(password) {
const key = createKey(password);
const iv = generateIV();
const prependedStream = new PrependIV(iv);
const combinedStream = pumpify(
prependedStream,
createGzip(),
createCipheriv("aes192", key, iv)
);
return combinedStream;
}
function createDecryptAndDecompress(password) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, _, done) {
if (!this.iv) {
this.iv = chunk.slice(0, 16);
this.cipher = createDecipheriv(
"aes192",
createPassword(password),
this.iv
);
this.push(chunk.slice(16));
} else {
this.push(chunk);
}
done();
},
flush(done) {
this.push(null);
done();
},
}).pipe(pumpify(createGunzip));
}
`
`js
// COMPRESS
const [,, password, source] = process.argv;
const destination = ${source}.gz.enc
;
pipeline(
createReadStream(source),
createCompressAndEncrypt(password),
createWriteStream(destination),
(err) => {
if (err) {
console.error(err)
process.exit(1)
}
console.log(${destination} created
)
}
);
`
`js
// DECOMPRESS
const [,, password, source] = process.argv;
const destination = source.replace('.gz.enc', '.out');
pipeline(
createReadStream(source),
createDecryptAndDecompress(password),
createWriteStream(destination),
(err) => {
if (err) {
console.error(err)
process.exit(1)
}
console.log(${destination} created
)
}
);
`
Forking streams
This is useful:
- when we want to send the same data to different destinations
- when we want to perform different transformations on the same data
- when we want to split the data based on some criteria
`js
import { createReadStream, createWriteStream } from 'fs';
import { createHash } from 'crypto';
const filename = process.argv[2];
const sha1Stream = createHash('sha1').setEncoding('hex');
const md5Stream = createHash('md5').setEncoding('hex');
const inputStream = createReadStream(filename);
inputStream
.pipe(sha1Stream)
.pipe(createWriteStream(${filename}.sha1
));
inputStream
.pipe(md5Stream)
.pipe(createWriteStream(${filename}.md5
));
`
- Both streams will end automatically unless
{ end: false }
is specified. - The two forks of the stream will receive the same data chunks.
- If one destination pauses the source stream to handle backpressure for a long time, all the other destinations will be waiting as well.
- If we pipe to an additional stream after we've started consuming the data at source (async piping), the new stream will only receive new chunks of data.
Merging streams
- Use the option
{ end: false }
when piping multiple sources to a single destination and then invokeend()
on the destination only when all the sources have completed reading.
`js
import { createReadStream, createWriteStream } from 'fs';
import split from 'split';
const dest = process.argv[2];
const sources = process.argv.slice(3);
const destStream = createWriteStream(dest);
let endCount = 0;
for (const source of sources) {
const sourceStream = createReadStream(source, { highWaterMark: 16 });
sourceStream.on("end", () => {
if (++endCount === sources.length) {
destStream.end();
console.log(${dest} created
);
}
});
sourceStream
.pipe(split(line => line + "\n"))
.pipe(destStream, { end: false });
}
`
To merge streams in order, we can modify the code as follows:
`js
import { createReadStream, createWriteStream } from 'fs';
import split from 'split';
const dest = process.argv[2];
const sources = process.argv.slice(3);
const destStream = createWriteStream(dest);
function processStream(index) {
if (index >= sources.length) {
destStream.end();
console.log(${dest} created
);
return;
}
const sourceStream = createReadStream(sources[index], { highWaterMark: 16 });
sourceStream.on("end", () => processStream(index + 1));
sourceStream
.pipe(split(line => line + "\n"))
.pipe(destStream, { end: false })
}
processStream(0);
`
multistream
- package that deals with merging streams in order.
Multiplexing and demultiplexing
The operation of combining multiple streams (in this case, also known as channels) to allow transmission over a single stream is called multiplexing. The process of reconstructing the original streams from the data received from a shared stream, is called demultiplexing. The devices that perform these operations are called multiplexer (or mux) and demultiplexer (or demux), respectively.
Multiplexing
`js
import { fork } from "child_process";
import { connect } from "net";
function multiplexChannels(sources, destination) {
let openChannels = sources.length;
for (let i = 0; i < sources.length; i++) {
sources[i]
.on("readable", function () {
let chunk;
while ((chunk = this.read()) !== null) {
const outBuffer = Buffer.alloc(1 + 4 + chunk.length);
outBuffer.writeUint8(i, 0);
outBuffer.writeUInt32BE(chunk.length, 1);
chunk.copy(outBuffer, 5);
console.log(Sending packet to channel: ${i}
)
destination.write(outBuffer);
}
})
.on("end", () => {
if (--openChannels === 0) {
destination.end();
}
});
}
}
// Create a TCP client connection to localhost:3000
const socket = connect(3000, () => {
const child = fork(
process.argv[2],
process.argv.slice(3),
// ensures child process does not inherit stdout and stderr of the parent
{ silent: true }
);
multiplexChannels([child.stdout, child.stderr], socket);
})
`
Demultiplexing
`js
import { createWriteStream } from "fs";
import { createServer } from "net";
function demultiplexChannel(source, destinations) {
let currentChannel = null;
let currentLength = null;
source
.on("readable", () => {
let chunk;
if (currentChannel === null) {
chunk = source.read(1);
currentChannel = chunk && chunk.readUInt8(0);
}
if (currentLength === null) {
chunk = source.read(4);
currentLength = chunk && chunk.readUInt32BE(0);
if (currentLength === null) {
return null;
}
chunk = source.read(currentLength);
if (chunk === null) {
return null;
}
console.log(`Received packet from: ${currentChannel}`);
destinations[currentChannel].write(chunk);
currentChannel = null;
currentLength = null;
}
})
.on("end", () => {
destinations.forEach(destination => {
destination.end();
});
console.log("Source channel closed");
});
}
const server = createServer(socket => {
const stdoutStream = createWriteStream('stdout.log');
const stderrStream = createWriteStream('stderr.log');
demultiplexChannel(socket, [stdoutStream, stderrStream]);
});
server.listen(3000, () => console.log("Server started"));
`
ternary-stream
- implement an if...else statement for streams.
Top comments (0)