DEV Community

Cover image for Processing massive amounts of data on demand without crashing nodejs main thread
Wesley Miranda
Wesley Miranda

Posted on • Updated on

Processing massive amounts of data on demand without crashing nodejs main thread

Sometimes we can have some problems processing big files using Node.js because the main thread(responsible for the event loop) can stop. For this reason, I decided to write about how to use streams in Node.js showing a funny example. Let's go!

Application: Node.js application that downloads a huge CSV file from Google Cloud Storage, processes the data, transforms it into a JSON file, extracts some data insights, and finally compresses the file and uploads it again to the Cloud Storage. (We are going to create a terminal progress bar to visualize the process progress)

Main principles:

  • Streams: Interface to work with chunks of data.
  • Readable Streams: Interface to consume data streams.
  • Writable Streams: Interface to provide data and source to the data streams.
  • Duplex Streams: Implement both interfaces, Readable and Writable.
  • Transform Streams: It's a Duplex Stream used to modify the data during the process.
  • PassThrough: It's a Transform Stream useful when we need to check or test something during the stream pipeline process.
  • Pipeline: It's useful to handle all the stream process.

Steps to reproduce:

  • Download file stream
  • Create a progress bar
  • Transform each CSV file line into javascript objects
  • Get information from the file
  • Convert the data into JSON format
  • Compress the data
  • Upload the new file converted

Requirements

  • The CSV file you can find here

  • We are going to use Node 16.16 version

  • You need to have a Google account to access Google Cloud Services.

Cloud Storage Service

We need to install cloud storage library to deal with google cloud service.

npm install @google-cloud/storage
Enter fullscreen mode Exit fullscreen mode

If you need help with configuring your Cloud Storage service on your Google account, many good tutorials can help you with that, it's not the purpose of this tutorial.

Let's create our first file cloudStorageFileService.js in src folder to work with our storage.

src/cloudStorageFileService.js

const { Storage } = require('@google-cloud/storage')
const path = require('path')
const serviceKey = path.join(__dirname, '../gkeys.json')


class CloudStorageFileService {

// (1)
    constructor() {
        this.storage = new Storage({
            projectId: 'my-project-id',
            keyFilename: serviceKey
        })
    }

// (2)
    async downloadFile(bucketName, fileName) {
        return await this.storage
            .bucket(bucketName)
            .file(fileName)
            .createReadStream()
    }

// (3)
    async uploadFile(bucketName, destFileName) {
        return await this.storage
            .bucket(bucketName)
            .file(destFileName)
            .createWriteStream()
    }

// (4)
    async getFileSize(bucketName, fileName) {
        const [metadata] = await this.storage
                        .bucket(bucketName)
                        .file(fileName)
                        .getMetadata();
        return metadata.size
    }
}

module.exports = CloudStorageFileService
Enter fullscreen mode Exit fullscreen mode
  1. Basic configurations to use Cloud Storage service, as the project id and the path with your Google Cloud credentials.

  2. Google Cloud Storage provides us a Readable Stream for downloading files, we can use it to download our huge file as a stream and not stuck main Node.js thread.

  3. Google Cloud Storage also provides a Writable Stream to upload files.

  4. The last function we are going to use to create our progress bar.


Progress Bar

Here we are extending PassThrough because we don't need to apply any modification to the stream, only to get some data to create our progress bar.

Important to know that Streams implement Event Emitter interface, and we can work with emit to emit events and on to listen to them.

src/progressPass.js

const { PassThrough } = require('stream')

class ProgressPass extends PassThrough {

// (1)
    constructor(fileSize, options = {}) {
        super({ ...options })
        this.on('data', this.processData)
        this.on('progress', this.showResult)
        this.on('close', this.finishProgress)
        this.bytesRead = 0
        this.progress = 0
        this.fileSize = fileSize
        this.createProgressBar()
    }

// (2)
    processData(data) {
        this.bytesRead += data.length
        this.progress = (this.bytesRead / this.fileSize) * 100
        this.emit('progress', Math.floor(this.progress))
    }

// (3)
    createProgressBar() {
        process.stdout.write("\x1B[?25l")
        process.stdout.write('[')
        for(let i = 1; i <= 101; i++) {
            process.stdout.write('-')
        }
        process.stdout.write(']')
    }

// (4)
    showResult(progress) {
        process.stdout.cursorTo(progress+1)
        process.stdout.write('=')


        process.stdout.cursorTo(105)
        process.stdout.write(`${progress}%`)

    }

    finishProgress() {
        process.stdout.write("\x1B[?25h")
        process.stdout.write("\n")
    }
}


module.exports = ProgressPass

Enter fullscreen mode Exit fullscreen mode
  1. Initializing our progress bar:
    • The file size parameter added to the constructor will be useful to create the progress percentage.
    • data event is emitted when comes a new chunk to the current stream.
    • progress is our custom event that we emit to update the progress bar
    • close is the event emitted when there is no more data to pass through the stream.
  2. Every time that comes to a new data we get the chunk and add to the sum the chunk's length and emitting progress event to update the progress bar.
  3. Creating a unfilled progress bar like:
[------------------------------------------------]
Enter fullscreen mode Exit fullscreen mode

4 When progress event happens we update the progress bar with the current percentage:

[===========-------------------------------------]  25%
Enter fullscreen mode Exit fullscreen mode

Transforming CSV line into a javascript object

Now It's time to get every line of our CSV file and convert it to JS object, the intention of it, is to manipulate the data easier and at the final of the process, convert it to a JSON file.

The strategy is to convert the binary chunk into text, and to read every line of the it.

To get the values we need to split the lines using ,.

When we are reading using streams, by default each chunk has 16kb (We can modify it), and at the moment of the separation could occur something like that:

chunk 1:

value1, value2, val
Enter fullscreen mode Exit fullscreen mode

chunk 2:

ue3, value4, value5
Enter fullscreen mode Exit fullscreen mode

Keep in mind we need to treat this kind of thing and save temporarily the previous chunk before splitting the text.

To treat the transformation we are going to use Transform Stream, as I mentioned It's useful when we want to transform our data chunks.

src/objectTransform.js

const { Transform } = require('stream')

class ObjectTranform extends Transform {

// (1)
    constructor(options = {}) {
        super({ ...options })
        this.headerLine = true
        this.keys = []
        this.tailChunk = ''
    }

// (2) 
    _transform(chunk, encoding, callback) {
        const stringChunks = chunk.toString("utf8")
        const lines = stringChunks.split('\n')

        for (const line of lines) {
            const lineString = (this.tailChunk + line)
            let values = lineString.split(',')

            if (this.headerLine) {
                this.keys = values
                this.headerLine = false
                continue
            }


            if (values.length !== this.keys.length || lineString[lineString.length - 1] === ',') {
                this.tailChunk = line
            } else {
                const chunkObject = {}

                this.keys.forEach((element, index) => {
                    chunkObject[element] = values[index]
                })

                this.tailChunk = ''
                this.push(`${JSON.stringify(chunkObject)}`)
            }
        }
        callback()
    }

// (3)
    _flush(callback) {
        callback()
    }

}

module.exports = ObjectTranform
Enter fullscreen mode Exit fullscreen mode
  1. Let's save the keys to create our JS objects using a flag. The tailChunk is for saving the incompleted CSV lines.

  2. The magic happens here. We read every line, split the text wether the lines are completed, and convert it to JS object.

  3. flush is called when there is no more data to be processed.


Extracting information

Now that we have JS objects we can extract some data from the chunks.

Here again, we are going to use PassThrough Stream because we only want to check some information.

src/monitorTransform.js

const { PassThrough } = require('stream')

class MonitorTransform extends PassThrough {
    constructor(options = {}) {
        super({ ...options })
        this.on('data', this.processData)
        this.on('close', this.showResult)
        this.totalCrimes = 0
        this.boroughTotal = new Map()
        this.monthTotal = new Map()
        this.yearTotal = new Map()
    }

// (1)
    processData(data) {
        const row = JSON.parse(data.toString())
        const rowCrimeQuantity = Number(row.value) || 0
        const currentBoroughTotal = Number(this.boroughTotal.get(row.borough)) || 0
        const currentMonthTotal = Number(this.monthTotal.get(row.month)) || 0
        const currentYearTotal = Number(this.yearTotal.get(row.year)) || 0

        this.totalCrimes += rowCrimeQuantity
        this.boroughTotal.set(row.borough, currentBoroughTotal + rowCrimeQuantity)
        this.monthTotal.set(row.month, currentMonthTotal + rowCrimeQuantity)
        this.yearTotal.set(row.year, currentYearTotal + rowCrimeQuantity)
    }

// (2)
    showResult() {
        console.log(this.totalCrimes)
        console.log(this.boroughTotal)
        console.log(this.monthTotal)
        console.log(this.yearTotal)
    }
}


module.exports = MonitorTransform
Enter fullscreen mode Exit fullscreen mode
  1. When a data event happens we can process the chunk and get whatever we want from it.

  2. If there is no more data to process we show the extracted information.


Transforming the file to a JSON file

All the chunks are in a JS object format, but when we save the file, won't work properly, we need to treat it, transforming it into an array of objects using Transform Stream.

src/jsonTransform.js

const { Transform } = require('stream')

class JsonTransform extends Transform {
    constructor (options = {}) {
        super({ ...options })
        this.once('data', this.startJson)
        this.firstLine = true
    }

// (1)
    startJson() {
        this.push('[')
    }

// (2)
    _transform (chunk, encoding, callback) {
        const row = JSON.parse(chunk.toString())
        const newChunk = this.firstLine ? `${JSON.stringify(row)}` : `,${JSON.stringify(row)}`
        this.push(newChunk)
        if(this.firstLine) {
            this.firstLine = false
        }
        callback()
    }

// (3)
    _flush(callback) {
        this.push(']')
        callback()
    }

}

module.exports = JsonTransform
Enter fullscreen mode Exit fullscreen mode
  1. Our array should starts with [, it happens only in the first chunk. Here we are using once instead of on, to listen only one time the event data.
  2. Here we are putting , to separate our JS objects.
  3. At the end of the file we need to close the array with ]

Compressing the chunks

For it we don't need so much efforts, Node.js has a core library to do it.

The zlib from Node.js is a Transform Stream used to compress chunks of data during the streaming process. We only need to add this to our stream pipeline.


Creating Stream pipeline process

The pipeline process needs to have at least the Readable and the Writable Stream, and it can have how many Tranform Stream and PassThrough Stream we want.

To handle the pipeline process we are going to use the Builder design pattern, to control the creation process of our pipeline and throw some errors.

src/fileProcessor.js

const { pipeline } = require('stream/promises')

class FileProcessor {
    constructor() {
        this.readableStream = null
        this.transforms = []
        this.writableStream = null
    }

// (1)
    setReadable(readableStream) {
        this.readableStream = readableStream
        return this
    }

// (2)
    addTransforms(transformsStream) {
        this.transforms = transformsStream
        return this
    }

// (3)
    setWritable(writableStream) {
        this.writableStream = writableStream
        return this
    }

// (4)
    async execute() {
        try {
            if(!this.readableStream) {
                throw Error('Readable stream not implemented')
            }
            if(!this.writableStream) {
                throw Error('Writable stream not implemented')
            }
            await pipeline(this.readableStream, ...this.transforms, this.writableStream)
        } catch (error) {
            console.log(error)
        }
    }
}

module.exports = FileProcessor
Enter fullscreen mode Exit fullscreen mode
  1. Setting our Readable Stream, in this case should be the donwload function from Google Cloud Storage.

  2. Adding Tranforms to our pipeline. All the Transform and PassThrough should be added as an array.

  3. The Writable Stream is the upload function from Google Cloud Storage.

  4. To execute the pipeline if there are the Writable and Readable Stream. The pipeline function must have at least these two functions.


Joining Everything

It's time to import everything we've created, define the constants and execute the the entire app.

src/index.js

// (1)
const FileProcessor = require('./fileProcessor')
const JsonTransform = require('./jsonTransform')
const MonitorTransform = require('./monitorTransform')
const ObjectTransform = require('./objectTransform')
const { createGzip } = require('node:zlib')
const CloudStorageFileService = require('./cloudStorageFileService')
const ProgressPass = require('./progressPass')
const fileProcessor = new FileProcessor()
const cloudFileService = new CloudStorageFileService()

// (2)
const gzip = createGzip()

// (3)
const bucketName = 'myfileuploads'
const fileName = 'london_crime_by_lsoa2.csv'
const destFileName = 'london_crime_by_lsoa2.tar.gz'

// (4)
    ;
(async () => {
    try {
        const fileSize = await cloudFileService.getFileSize(bucketName, fileName)
        await fileProcessor
            .setReadable(await cloudFileService.downloadFile(bucketName, fileName))
            .addTransforms([new ProgressPass(fileSize), new ObjectTransform(), new MonitorTransform(), new JsonTransform(), gzip])
            .setWritable(await cloudFileService.uploadFile(bucketName, destFileName))
            .execute()
    } catch (e) {
        console.log(e)
    }
})()

Enter fullscreen mode Exit fullscreen mode
  1. Here are all the things we've created, services, Transform Streams and PassThrough Streams.

  2. As I mentioned, this is the Node.js library to compress data.

  3. We need to define our Cloud Storage bucket and file.

  4. This is our final function to execute everything together, defining the Readable, Transforms, and the Writable.


Takeaways

  • You can use this approach to import and export data between databases or generate reports.
  • Streams are useful to process and treat audio and video files and for file conversion.

You can take a look at the entire code here

Top comments (0)