DEV Community

loading...
Cover image for Optimizing Conditional Bulk Insert in Node.js + PostgreSQL

Optimizing Conditional Bulk Insert in Node.js + PostgreSQL

Yogi Saputro
Full Stack Developer with business acumen. Also a bit of data and business enthusiast.
・8 min read

It is my journey of improving a bulk insert algorithm. It's a mouthful title, but definitely worthwhile read for this specific niche.

Intro

If you're either :

  • Working on ETL pipeline from various sources
  • Scraping websites
  • Aggregating data

chances are you will need conditional bulk insert.

I described term conditional bulk insert as an operation where many records need to be inserted to DB at once, while ensuring there's no conflict on unique identifiers.

In this article, I'm going to compare three approaches of conditional bulk insert for Node.JS and PostgreSQL:

  1. Naive approach
  2. Filtered approach
  3. ON CONFLICT approach

Basic Implementation

For the purpose of demonstration, let's say there is a system for benefits program registration in country X. Data collection process is carried offline by field agents. Then, each agent upload applicants data to system at any particular time. Suppose that the system must check duplicate national ID of X. Citizen of X can only be registered once by single national ID. For simplicity, we will only store first data with unique national ID. If a duplicate exists later, it will neither update or get inserted to database.

Now we will determine best algorithm to insert data in such requirements.

Software requirements:

  • node at least v12.0
  • npm at least v6.0
  • PostgreSQL at least v9.5

Project Setup:

  • make new project folder, for example mkdir bulk_insert_demo
  • go to directory: cd bulk_insert_demo
  • create new Node project: npm init -y
  • Install necessary packages: npm install pg-promise faker
  • Create new file index.js
  • Create new table benefit_recipients with query below:
-- table definition
CREATE TABLE IF NOT EXISTS benefit_recipients (
  id serial NOT NULL PRIMARY KEY,
  national_id VARCHAR UNIQUE NOT NULL,
  person_name VARCHAR DEFAULT NULL,
  city VARCHAR DEFAULT NULL,
  benefit_amount INT DEFAULT 0,
  created_at TIMESTAMP DEFAULT NOW() NOT NULL,
  updated_at TIMESTAMP DEFAULT NULL
);
Enter fullscreen mode Exit fullscreen mode

1. Naive Approach

Naive approach states that first we check for ID of a record. If an ID exists, we do nothing. Otherwise, we insert record with new, unique ID.

Translating to Node.JS with pg-promise package is simple. I use ES6 async-await style. Checking for national_id is possible using SELECT COUNT(*). INSERT process is also straightforward. I put them all in classic for because it works well for async function. Finally, I added some console for information.

/**
 * Naive approach: for every row, check if national_id exists in DB. If it doesn't, insert the row
 * @param {*} client pg-promise client
 * @param {*} bulkData data to insert as array of object
 */
const naiveBulkInsert = async (client, bulkData) => {
  try {
    let counter = 0;
    for (let i = 0; i < bulkData.length; i++) {
      const checkIdExists = await client.one('SELECT COUNT(*) FROM benefit_recipients WHERE national_id = $1', [bulkData[i].national_id]);
      if (checkIdExists.count == 0) { // needed loose 0, not strict 0
        await client.any(`
          INSERT INTO benefit_recipients (national_id, person_name, city, benefit_amount) 
          VALUES ($1, $2, $3, $4)`,[bulkData[i].national_id, bulkData[i].person_name, bulkData[i].city,
          bulkData[i].benefit_amount]);
        counter++;
        process.stdout.write(`Parsing ${counter} products...\r`);
      }
    }
    process.stdout.write(`\nSuccessfully insert ${counter} records.\n`);
  } catch (error) {
    console.log(error);    
  }
}
Enter fullscreen mode Exit fullscreen mode

Advantages

I have to admit, this is the kind of algorithm I wrote when I'm early in my career. It popped to mind directly, simple, and readable. These are the advantages of naive approach:

  1. Easy to implement
  2. Easy to add custom validation logic
  3. Can handle duplicates within data source as well as between source - database

Disadvantages

  1. Runs a lot of queries (between N and 2N where N is data count). We'll see the performance later.
  2. Not ACID compliant, unless wrapped by transaction.

2. Filtered Approach

Naive approach has drawback because it uses many queries. It is like using one truck to send only one box, then tell the truck to go back and forth to send all boxes. Send all boxes on one truck would be more sensible.

That is the basic idea of filtered approach. Instead of checking N times and inserting 0 to N times, why not create one query for all checking and one for inserting? More precisely, one search query for duplicates between data source and database, then one insert query for data source that is not in duplicates.

The diagram below explains what I wish to achieve.
Bulk insert explained using venn diagram

For duplicate search, a SELECT with WHERE ... IN (...) clause will do. Then, the query result will serve as filtering criteria for data source. I did some map() and filter() here. The methods can be chained for sure, if that is preferrable. After that, I used helper from pg-promise method for inserting multiple records.

I found out later that this approach didn't resolve duplicates within data sources, for example if a benefit applicant register more than once. So I fixed it, making internal duplicate check as first step.

So, this is full method for filtered approach.

const pgp = require('pg-promise')();

/**
 * Filtered approach:
 * - Remove internal duplicates 
 * - Search intersection of national_id between bulk data & DB.
 * - Then, filter out duplicates
 * - Finally, insert the rest into DB
 * 
 * @param {*} client pg-promise client
 * @param {*} bulkData data to insert as array of object
 */
const filterBulkInsert = async (client, bulkData) => {
  try {
    // Resolve duplicates in bulkData. Can be more complex than this.
    const uniqueBulkData = bulkData.filter((value,idx,arr)=>arr.findIndex(el=>(el.national_id === value.national_id))===idx);

    const inputNationalIds = uniqueBulkData.map((item) => {return item.national_id});
    const listDuplicates = await client.any('SELECT national_id FROM benefit_recipients WHERE national_id IN ($1:csv)',[inputNationalIds]);
    const duplicatesArray = listDuplicates.map((item) => {return item.national_id});
    const dataToInsert = uniqueBulkData.filter((item) => !duplicatesArray.includes(item.national_id));
    const columns = Object.keys(bulkData[0]).map((str) => str.trim());
    const setTable = new pgp.helpers.ColumnSet(columns , {table: 'benefit_recipients'});
    const insert = pgp.helpers.insert(dataToInsert, setTable);
    await client.any(insert);
    console.log(`Successfully insert ${dataToInsert.length} records.`);
  } catch (error) {
    console.log(error);
  }
}
Enter fullscreen mode Exit fullscreen mode

Advantages

  1. Run with only two queries
  2. INSERT method is ACID compliant

Disadvantages

  1. Doesn't handle duplicates within data source by default
  2. Has a lot of array manipulation (may be substituted with more complex queries)

3. ON CONFLICT Approach

I kept pushing forward to improve the query. It turned out that simple PostgreSQL clause ON CONFLICT(...) DO NOTHING is the answer I need. It get rid of SELECT query and array manipulation from previous approach. The code became simpler and more concise. However, note that columns in ON CONFLICT(...) must be declared as UNIQUE in table definition.

The algorithm for third approach is as follows:

const pgp = require('pg-promise')();

const onConflictBulkInsert = async (client, bulkData) => {
  try {
    const columns = Object.keys(bulkData[0]).map((str) => str.trim());
    const setTable = new pgp.helpers.ColumnSet(columns , {table: 'benefit_recipients'});
    const onConflict = ' ON CONFLICT(national_id) DO NOTHING RETURNING *';
    const insertOnConflict = pgp.helpers.insert(bulkData, setTable) + onConflict;
    const result = await client.any(insertOnConflict);
    console.log(`Successfully insert ${result.length} records.`);
  } catch (error) {
    console.log(error);
  }
}
Enter fullscreen mode Exit fullscreen mode

Advantages

  1. Run with only ONE query
  2. ACID compliant
  3. Can handle duplicates within data source as well as between source - database

Disadvantages

none so far

Testing Setup

For the demonstration purpose, I define some testing strategies:

  • national_id length and bulk data size are parameterized to ensure duplicate values. For example, suppose national_id is a three digit number with 1000 possible values (0-999). If I create 750 samples, then according to birthday problem principle there's >99% chance of national_id duplicates.
  • Tests are run iteratively in various scenarios. First, each test is run independently for small samples. Then, sample size will be increased accordingly. After that, there will also be test to run three algorithms sequentialy in random order to further measure performance.

For testing purpose, I created some functions:
generateSample() to create array of object with two parameters : sample size and national_id length

const faker = require('faker');  

const generateSample = (size, idLength) => {
  let sample = [];
  for (let i = 0; i < size; i++) {
    sample.push({
      // national_id, center of this test, is defined with specific idLength
      national_id : faker.finance.account(idLength),
      person_name : faker.name.firstName() + ' ' + faker.name.lastName(),
      city : faker.address.cityName(),
      benefit_amount : faker.finance.amount(100, 500, 0) * 10000
    });
  }
  return sample;
}
Enter fullscreen mode Exit fullscreen mode

cleanTable() is used to delete all records from benefit_recipients. It is used after inserting to database. Otherwise, there would be all duplicates, right?

const cleanTable = async (client) => {
  // I don't use TRUNCATE because I will display number of deleted records
  const deleteRecords = await client.any(`DELETE FROM benefit_recipients
  WHERE created_at > (current_timestamp - interval '1 day')
  RETURNING *`);
  console.log(`Deleted ${deleteRecords.length} records.`);
}

Enter fullscreen mode Exit fullscreen mode

runNaiveTest(), runFilteredTest(), and runOnConflictTest(), each consists of timing, sample generation, and INSERT execution.

const runNaiveTest = async (client, sampleSize, idLength) => {
  try {
    console.time('naive_approach');
    console.log('executing naive approach...');
    console.log(`creating ${sampleSize} samples...`);
    let sample1 = generateSample(sampleSize, idLength);
    await naiveBulkInsert(client, sample1);
    console.timeEnd('naive_approach');
  } catch (error) {
    console.log(error);
  }
}
Enter fullscreen mode Exit fullscreen mode
const runFilteredTest = async (client, sampleSize, idLength) => {
  try {
    console.time('filter_approach');
    console.log('executing filter approach...');
    console.log(`creating ${sampleSize} samples...`);
    let sample2 = generateSample(sampleSize, idLength);
    await filterBulkInsert(client, sample2);
    console.timeEnd('filter_approach');
  } catch (error) {
    console.log(error);
  }
}
Enter fullscreen mode Exit fullscreen mode
const runOnConflictTest = async (client, sampleSize, idLength) => {
  try {
    console.time('on_conflict_approach');
    console.log('executing on conflict approach...');
    console.log(`creating ${sampleSize} samples...`);
    let sample3 = generateSample(sampleSize, idLength);
    await onConflictBulkInsert(client, sample3);
    console.timeEnd('on_conflict_approach');
  } catch (error) {
    console.log(error);
  }
}
Enter fullscreen mode Exit fullscreen mode

And finally, an adjustable main() function. It runs specific test with defined iterations, sample size, national_id length, and PostgreSQL client. In the example below, I will run sequential test of naive, filtered, and ON CONFLICT approach, 10 times.
If I want to test each approach independently, I can adjust it here and re-run the test.

const main = async () => {
  const dbClient = pgp(process.env.DB_CLIENT);
  const iteration = 10;
  // to create >99% duplicate guarantee, I use formula (samples) / (10^idLength) = 0.75
  const samples = 750;
  const idLength = 3;
  // adjust desired test here
  for (let idx = 0; idx < iteration; idx++) {
    await runNaiveTest(dbClient,samples,idLength);
    await runOnConflictTest(dbClient,samples,idLength);
    await runFilteredTest(dbClient,samples,idLength);
    await cleanTable(dbClient);
  }
}

main();
Enter fullscreen mode Exit fullscreen mode

All of codes above can be written as one index.js file like I did, or spread into separate files with proper import. Then run
DB_CLIENT=postgres://theusername:thepassword@localhost:5432/db_name node index.js, change DB_CLIENT value to correct connection string.

Testing Result

First, I tested each approach independently with 750 samples.
This is the result.
Bulk Insert Algorithm Comparisons for small samples
Well, I expected naive approach to be slow, but not THAT slow. It takes almost 2000 times slower than filtered and ON CONFLICT approach. That number might vary due to various reasons. My database is on remote server, so network latency is a factor. One thing is certain: naive approach is much slower than others. First lesson learned: Never use naive approach. From this point on, I'll exclude naive approach from testing.

On a side note, filtered approach is a bit faster than ON CONFLICT approach. We'll see more from them.

This time, I increase sample size to 7500 and adjust national_id to 4 digits. This is the performance result.
Bulk Insert Algorithm Comparisons for medium samples
For 7500 samples, filtered approach is about 33% faster than 'ON CONFLICT' approach.

Then, I increase sample size to 75000. This time, there's a surprise.
Bulk Insert Algorithm Comparisons for big samples
Filtered approach takes much longer this time. It also didn't increase linearly from previous sample. On the other hand, ON CONFLICT approach seems to scale well linearly as its execution time increase is consistent.

This might be caused by array manipulation. On filtered approach, array manipulation is carried by Node.JS. filter() and map() functions, especially in nested fashion, is expensive. Plus, javascript is not considered fast. On bigger sample size, it breaks. That is not the case for ON CONFLICT approach. The heavy lifting is done by PostgreSQL internal which has been optimized for relational algebra stuff.

Conclusion

Key takeaways

  1. Don't use naive approach. It is really slow.
  2. Filtered approach breaks on bigger INSERT size.
  3. ON CONFLICT approach is fast, scales well, ACID-compliant, and fulfill the requirements well. Use it.

Credit: Photo by Kendall Henderson on Unsplash

Discussion (2)

Collapse
priteshusadadiya profile image
Pritesh Usadadiya

[[Pingback]]

Curated as a part of #19th Issue of Software Testing Notes newsletter.

softwaretestingnotes.substack.com/...

Collapse
yogski profile image
Yogi Saputro Author

Thanks for featuring this article :)