DEV Community

Daniele Vilela
Daniele Vilela

Posted on

Processing 1 Million SQL Rows to CSV using Node.js Streams

In this exercise, we will embark on a journey into the world of Node.js core concepts, focusing specifically on the power of streams. The challenge is to read and export one million SQL rows to a CSV file. So first things, first: what are streams?

Following Node.js documentation we can find:

A stream is an abstract interface for working with streaming data in Node.js.

But what does that mean? Think of streams as a way to handle a continuous flow of data in Node.js. Like a pipeline for data. But why do we need it? Well, sometimes our computer can't handle all the data at once because it's too big. Streams help by letting us work with small chunks of data at a time, kind of like how you watch a show on Netflix – it comes in small parts instead of the whole thing all at once.

In the upcoming example, we'll be working with three specific types of streams: Readable, Transform, and Writable. And based on a Postgres query, we are going to manipulate and save the results in a csv file.

You can find the example source code here:
Github Repo


Putting It All Together

Let's start by setting up the groundwork:

1. Adding the postgres dependencies

yarn add pg pg-query-stream
Enter fullscreen mode Exit fullscreen mode

2. Creating a database pool

We set up a function to create a database connection pool using the pg package. This connection pool will enable us to manage and handle database connections.

import pg from 'pg';

export function createDatabasePool() {
  try {
    const connectionString = `postgres://${USER}:${PASSWORD}@localhost:5432/postgres`;
    const pool = new pg.Pool({ connectionString });
    return pool;
  } catch (error) {
    console.error('Error creating database pool:', error);
    throw error;
  }
}
Enter fullscreen mode Exit fullscreen mode

3. Configuring Streams

We create three types of streams to accomplish our task: a Readable stream to fetch data from the database, a Transform stream to process and format the data, and a Writable stream to save the processed data to a CSV file.

To create a readable stream, you need the package pg-query-stream, which will receive result rows from pg as a readable (object) stream.

Readable Stream

The stream uses a cursor on the server so it keeps only a low number of rows in memory, the cursor size is defined by the variable batchSize

const queryStream = new QueryStream(
  "SELECT * FROM generate_series(0, $1) num",
  [1000000],
  { batchSize: 1000 }
);
Enter fullscreen mode Exit fullscreen mode

Transform Stream

Because we receive an object, we need to transform the data before adding it to the file. I'm also adding new data to the chunk as show below

const transformStream = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    row.description = `Row ${row.num}`;
    row.date = new Date().toString();
    callback(null, `${row.num}, ${row.description}, ${row.date}` + "\n");
  },
});
Enter fullscreen mode Exit fullscreen mode

Writable Stream

In this case, we are writing data to a file so we can use the nodejs filestream

const fileWriteStream = fileStream.createWriteStream("output.csv");
Enter fullscreen mode Exit fullscreen mode

Starting the Data Flow

With the streams configured, we define a function called startStream that initiates the data flow process. Inside this function, we establish a connection to the database using the connection pool and create a query stream from the provided SQL query.

const startStream = (transformStream, writeStream) => {
  console.log("STARTED ", new Date());
  pool.connect((err, client, done) => {
    if (err) console.error(err);

    const stream = client.query(queryStream);

    stream
      .pipe(transformStream)
      .pipe(writeStream)
      .on("error", console.error)
      .on("finish", () => {
        console.log("FINISHED: ", new Date());
        done();
      });
  });
};

startStream(transformStream, fileWriteStream);
Enter fullscreen mode Exit fullscreen mode

Explanation:
stream.pipe(transformStream): connects the query stream to the transform stream. This means that data retrieved from the database will be passed through the transformStream for processing.

transformStream.pipe(writeStream): connects the transform stream to the write stream. Processed data from the transform stream is then written to the specified file using the writeStream.

.on("error", console.error): attaches an error event listener to the pipeline. If an error occurs at any stage, it will be logged to the console.

.on("finish", () => {...}): attaches a finish event listener to the pipeline. When the entire process of streaming, transforming, and writing is completed, this function will be executed.

Inside the finish event listener, a timestamp is logged using console.log("FINISHED: ", new Date()), marking the completion of the data processing.

done() is called to release the database client back to the pool, indicating that it's available for reuse.

Finally, the startStream function is invoked with transformStream and fileWriteStream as arguments, effectively starting the entire data processing and writing pipeline.

Visualizing the Process

For a visual representation of the process, take a look at the the terminal:

$ node streams.js
STARTED  2023-08-10T05:33:06.521Z
FINISHED:  2023-08-10T05:33:24.567Z
Done in 28.70s.
Enter fullscreen mode Exit fullscreen mode

Also a new file with the name output.csv will be created with 1 million transformed rows!

Conclusion

In this exercise, we've explored the power of Node.js streams and their ability to handle large amounts of data efficiently. We've learned how to use Readable, Transform, and Writable streams to read data from a PostgreSQL database, process it, and save it as a CSV file. By breaking down the data processing into smaller chunks, we can conserve memory and improve the overall performance of our application.
Feel free to explore the code, experiment with different settings, and adapt it to your own projects. Happy coding!

Top comments (7)

Collapse
 
lucasandflores profile image
Lucas Andrade Flores

Awesome content, congrats! I recommend you explore some things: For the transform stream, maybe it will be better to use an async or sync generator. And for all pipe operations, could be nice to use also the async pipeline. I have something similar (almost) on this repo: github.com/LucasAndFlores/file-to-.... But as the Mandalorian says: This is the way

Collapse
 
danielevilela profile image
Daniele Vilela

Thank you for commenting! I'll definitely check your repo :)

Collapse
 
shifi profile image
Shifa Ur Rehman

Awesome content ❀️

Collapse
 
bnmeier profile image
BNMeier

Why not just use the COPY INTO and EXPORT native Postgres commands and do the CSV output in a couple seconds? It's 2 lines of SQL and extremely efficient.

Collapse
 
danielevilela profile image
Daniele Vilela • Edited

Thank you for taking the time to comment on my post! You're absolutely right that using PostgreSQL's native commands like COPY INTO and EXPORT its well suited to exporting data to a CSV file.

However, the purpose of the post was to explore the capabilities of Node.js streams and give an example how streams work and can be used to handle large datasets.

The sql query here its just a a very simple example and I can give a few cases below where it cannot be achieved in a single query. Specially when your constraints are:
project requirements, dynamic data and available tools

  • Different data sources: sometimes you don't have all the information needed in the same database and the exported file needs to combine them for your customer

  • Transformations:
    Specif Transformations that needs to be applied to every row or a dynamic field inside a jsonb column

  • Sometimes you just have complex data that changes along with your user configurations. User A can have access to something and User B to another etc

In the real world, speciallly startups, you might not have access to ETL tools to accomplish these cases, nor the skill and time to build them, refactoring and so on.

That's why it's not always 2 lines of SQL and the combination of 'pg' streams and nodejs streams can be very handy for that moment in time.

Hope this answers your comment 😊

Collapse
 
aceix profile image
the_aceix

Nice article! Will this utilise all available CPU cores?

Collapse
 
urielsouza29 profile image
Uriel dos Santos Souza

No! Single thread.