It is my journey of improving a bulk insert algorithm. It's a mouthful title, but definitely worthwhile read for this specific niche.
Intro
If you're either :
- Working on ETL pipeline from various sources
- Scraping websites
- Aggregating data
chances are you will need conditional bulk insert.
I described term conditional bulk insert
as an operation where many records need to be inserted to DB at once, while ensuring there's no conflict on unique identifiers.
In this article, I'm going to compare three approaches of conditional bulk insert for Node.JS and PostgreSQL:
- Naive approach
- Filtered approach
-
ON CONFLICT
approach
Basic Implementation
For the purpose of demonstration, let's say there is a system for benefits program registration in country X. Data collection process is carried offline by field agents. Then, each agent upload applicants data to system at any particular time. Suppose that the system must check duplicate national ID of X
. Citizen of X can only be registered once by single national ID
. For simplicity, we will only store first data with unique national ID
. If a duplicate exists later, it will neither update or get inserted to database.
Now we will determine best algorithm to insert data in such requirements.
Software requirements:
- node at least
v12.0
- npm at least
v6.0
- PostgreSQL at least
v9.5
Project Setup:
- make new project folder, for example
mkdir bulk_insert_demo
- go to directory:
cd bulk_insert_demo
- create new Node project:
npm init -y
- Install necessary packages:
npm install pg-promise faker
- Create new file
index.js
- Create new table
benefit_recipients
with query below:
-- table definition
CREATE TABLE IF NOT EXISTS benefit_recipients (
id serial NOT NULL PRIMARY KEY,
national_id VARCHAR UNIQUE NOT NULL,
person_name VARCHAR DEFAULT NULL,
city VARCHAR DEFAULT NULL,
benefit_amount INT DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW() NOT NULL,
updated_at TIMESTAMP DEFAULT NULL
);
1. Naive Approach
Naive approach states that first we check for ID of a record. If an ID exists, we do nothing. Otherwise, we insert record with new, unique ID.
Translating to Node.JS with pg-promise
package is simple. I use ES6 async-await style. Checking for national_id
is possible using SELECT COUNT(*)
. INSERT
process is also straightforward. I put them all in classic for
because it works well for async function. Finally, I added some console for information.
/**
* Naive approach: for every row, check if national_id exists in DB. If it doesn't, insert the row
* @param {*} client pg-promise client
* @param {*} bulkData data to insert as array of object
*/
const naiveBulkInsert = async (client, bulkData) => {
try {
let counter = 0;
for (let i = 0; i < bulkData.length; i++) {
const checkIdExists = await client.one('SELECT COUNT(*) FROM benefit_recipients WHERE national_id = $1', [bulkData[i].national_id]);
if (checkIdExists.count == 0) { // needed loose 0, not strict 0
await client.any(`
INSERT INTO benefit_recipients (national_id, person_name, city, benefit_amount)
VALUES ($1, $2, $3, $4)`,[bulkData[i].national_id, bulkData[i].person_name, bulkData[i].city,
bulkData[i].benefit_amount]);
counter++;
process.stdout.write(`Parsing ${counter} products...\r`);
}
}
process.stdout.write(`\nSuccessfully insert ${counter} records.\n`);
} catch (error) {
console.log(error);
}
}
Advantages
I have to admit, this is the kind of algorithm I wrote when I'm early in my career. It popped to mind directly, simple, and readable. These are the advantages of naive approach:
- Easy to implement
- Easy to add custom validation logic
- Can handle duplicates within data source as well as between source - database
Disadvantages
- Runs a lot of queries (between
N
and2N
whereN
is data count). We'll see the performance later. - Not ACID compliant, unless wrapped by transaction.
2. Filtered Approach
Naive approach has drawback because it uses many queries. It is like using one truck to send only one box, then tell the truck to go back and forth to send all boxes. Send all boxes on one truck would be more sensible.
That is the basic idea of filtered approach. Instead of checking N
times and inserting 0 to N
times, why not create one query for all checking and one for inserting? More precisely, one search query for duplicates between data source and database, then one insert query for data source that is not in duplicates.
The diagram below explains what I wish to achieve.
For duplicate search, a SELECT
with WHERE ... IN (...)
clause will do. Then, the query result will serve as filtering criteria for data source. I did some map()
and filter()
here. The methods can be chained for sure, if that is preferrable. After that, I used helper
from pg-promise
method for inserting multiple records.
I found out later that this approach didn't resolve duplicates within data sources, for example if a benefit applicant register more than once. So I fixed it, making internal duplicate check as first step.
So, this is full method for filtered approach.
const pgp = require('pg-promise')();
/**
* Filtered approach:
* - Remove internal duplicates
* - Search intersection of national_id between bulk data & DB.
* - Then, filter out duplicates
* - Finally, insert the rest into DB
*
* @param {*} client pg-promise client
* @param {*} bulkData data to insert as array of object
*/
const filterBulkInsert = async (client, bulkData) => {
try {
// Resolve duplicates in bulkData. Can be more complex than this.
const uniqueBulkData = bulkData.filter((value,idx,arr)=>arr.findIndex(el=>(el.national_id === value.national_id))===idx);
const inputNationalIds = uniqueBulkData.map((item) => {return item.national_id});
const listDuplicates = await client.any('SELECT national_id FROM benefit_recipients WHERE national_id IN ($1:csv)',[inputNationalIds]);
const duplicatesArray = listDuplicates.map((item) => {return item.national_id});
const dataToInsert = uniqueBulkData.filter((item) => !duplicatesArray.includes(item.national_id));
const columns = Object.keys(bulkData[0]).map((str) => str.trim());
const setTable = new pgp.helpers.ColumnSet(columns , {table: 'benefit_recipients'});
const insert = pgp.helpers.insert(dataToInsert, setTable);
await client.any(insert);
console.log(`Successfully insert ${dataToInsert.length} records.`);
} catch (error) {
console.log(error);
}
}
Advantages
- Run with only two queries
-
INSERT
method is ACID compliant
Disadvantages
- Doesn't handle duplicates within data source by default
- Has a lot of array manipulation (may be substituted with more complex queries)
3. ON CONFLICT
Approach
I kept pushing forward to improve the query. It turned out that simple PostgreSQL clause ON CONFLICT(...) DO NOTHING
is the answer I need. It get rid of SELECT
query and array manipulation from previous approach. The code became simpler and more concise. However, note that columns in ON CONFLICT(...)
must be declared as UNIQUE
in table definition.
The algorithm for third approach is as follows:
const pgp = require('pg-promise')();
const onConflictBulkInsert = async (client, bulkData) => {
try {
const columns = Object.keys(bulkData[0]).map((str) => str.trim());
const setTable = new pgp.helpers.ColumnSet(columns , {table: 'benefit_recipients'});
const onConflict = ' ON CONFLICT(national_id) DO NOTHING RETURNING *';
const insertOnConflict = pgp.helpers.insert(bulkData, setTable) + onConflict;
const result = await client.any(insertOnConflict);
console.log(`Successfully insert ${result.length} records.`);
} catch (error) {
console.log(error);
}
}
Advantages
- Run with only ONE query
- ACID compliant
- Can handle duplicates within data source as well as between source - database
Disadvantages
none so far
Testing Setup
For the demonstration purpose, I define some testing strategies:
-
national_id
length and bulk data size are parameterized to ensure duplicate values. For example, supposenational_id
is a three digit number with 1000 possible values (0-999). If I create 750 samples, then according to birthday problem principle there's >99% chance ofnational_id
duplicates. - Tests are run iteratively in various scenarios. First, each test is run independently for small samples. Then, sample size will be increased accordingly. After that, there will also be test to run three algorithms sequentialy in random order to further measure performance.
For testing purpose, I created some functions:
generateSample()
to create array of object with two parameters : sample size and national_id
length
const faker = require('faker');
const generateSample = (size, idLength) => {
let sample = [];
for (let i = 0; i < size; i++) {
sample.push({
// national_id, center of this test, is defined with specific idLength
national_id : faker.finance.account(idLength),
person_name : faker.name.firstName() + ' ' + faker.name.lastName(),
city : faker.address.cityName(),
benefit_amount : faker.finance.amount(100, 500, 0) * 10000
});
}
return sample;
}
cleanTable()
is used to delete all records from benefit_recipients
. It is used after inserting to database. Otherwise, there would be all duplicates, right?
const cleanTable = async (client) => {
// I don't use TRUNCATE because I will display number of deleted records
const deleteRecords = await client.any(`DELETE FROM benefit_recipients
WHERE created_at > (current_timestamp - interval '1 day')
RETURNING *`);
console.log(`Deleted ${deleteRecords.length} records.`);
}
runNaiveTest()
, runFilteredTest()
, and runOnConflictTest()
, each consists of timing, sample generation, and INSERT
execution.
const runNaiveTest = async (client, sampleSize, idLength) => {
try {
console.time('naive_approach');
console.log('executing naive approach...');
console.log(`creating ${sampleSize} samples...`);
let sample1 = generateSample(sampleSize, idLength);
await naiveBulkInsert(client, sample1);
console.timeEnd('naive_approach');
} catch (error) {
console.log(error);
}
}
const runFilteredTest = async (client, sampleSize, idLength) => {
try {
console.time('filter_approach');
console.log('executing filter approach...');
console.log(`creating ${sampleSize} samples...`);
let sample2 = generateSample(sampleSize, idLength);
await filterBulkInsert(client, sample2);
console.timeEnd('filter_approach');
} catch (error) {
console.log(error);
}
}
const runOnConflictTest = async (client, sampleSize, idLength) => {
try {
console.time('on_conflict_approach');
console.log('executing on conflict approach...');
console.log(`creating ${sampleSize} samples...`);
let sample3 = generateSample(sampleSize, idLength);
await onConflictBulkInsert(client, sample3);
console.timeEnd('on_conflict_approach');
} catch (error) {
console.log(error);
}
}
And finally, an adjustable main()
function. It runs specific test with defined iterations, sample size, national_id
length, and PostgreSQL client. In the example below, I will run sequential test of naive, filtered, and ON CONFLICT
approach, 10 times.
If I want to test each approach independently, I can adjust it here and re-run the test.
const main = async () => {
const dbClient = pgp(process.env.DB_CLIENT);
const iteration = 10;
// to create >99% duplicate guarantee, I use formula (samples) / (10^idLength) = 0.75
const samples = 750;
const idLength = 3;
// adjust desired test here
for (let idx = 0; idx < iteration; idx++) {
await runNaiveTest(dbClient,samples,idLength);
await runOnConflictTest(dbClient,samples,idLength);
await runFilteredTest(dbClient,samples,idLength);
await cleanTable(dbClient);
}
}
main();
All of codes above can be written as one index.js
file like I did, or spread into separate files with proper import. Then run
DB_CLIENT=postgres://theusername:thepassword@localhost:5432/db_name node index.js
, change DB_CLIENT
value to correct connection string.
Testing Result
First, I tested each approach independently with 750 samples.
This is the result.
Well, I expected naive approach to be slow, but not THAT slow. It takes almost 2000 times slower than filtered and ON CONFLICT
approach. That number might vary due to various reasons. My database is on remote server, so network latency is a factor. One thing is certain: naive approach is much slower than others. First lesson learned: Never use naive approach. From this point on, I'll exclude naive approach from testing.
On a side note, filtered approach is a bit faster than ON CONFLICT
approach. We'll see more from them.
This time, I increase sample size to 7500 and adjust national_id
to 4 digits. This is the performance result.
For 7500 samples, filtered approach is about 33% faster than 'ON CONFLICT' approach.
Then, I increase sample size to 75000. This time, there's a surprise.
Filtered approach takes much longer this time. It also didn't increase linearly from previous sample. On the other hand, ON CONFLICT
approach seems to scale well linearly as its execution time increase is consistent.
This might be caused by array manipulation. On filtered approach, array manipulation is carried by Node.JS. filter()
and map()
functions, especially in nested fashion, is expensive. Plus, javascript is not considered fast. On bigger sample size, it breaks. That is not the case for ON CONFLICT
approach. The heavy lifting is done by PostgreSQL internal which has been optimized for relational algebra stuff.
Conclusion
Key takeaways
- Don't use naive approach. It is really slow.
- Filtered approach breaks on bigger
INSERT
size. -
ON CONFLICT
approach is fast, scales well, ACID-compliant, and fulfill the requirements well. Use it.
Credit: Photo by Kendall Henderson on Unsplash
Top comments (2)
[[Pingback]]
Curated as a part of #19th Issue of Software Testing Notes newsletter.
softwaretestingnotes.substack.com/...
Thanks for featuring this article :)