Introduction
Workers threads in Node.js are a way to offload CPU intensive tasks away from the single-threaded process which Node gives you.
Firstly, we need to understand why you can't put a CPU intensive task in the main process of your Node.js instance. This is because Node.js is single-threaded and you only get one process out of the box, A process is a global object that has the information of what is being executed at the time.
I have but One Thread to give - Node.js
The decision to make Node.js single-threaded came from the decision of not changing the language design itself, Adding a multithread module to Javascript can change the way the language is written itself.
Node.js has one event loop, this is what gives Node it's asynchronous nature by offloading operations to the system's kernel and getting back results through the use of callbacks, promises and async/await, thus we don't have to worry about concurrency problems.
This can become an issue when you have a CPU intensive task to be executed. For example, performing synchronous tasks that takes a lot of time to be executed or has complex mathematical calculations that can block the thread while it's being executed, which means all other tasks to be executed at that time has to wait. If it was an API request any other HTTP request that comes in at that time would be blocked, which keeps the end-user waiting, A solution for this is the use of worker threads.
Working with Worker Threads
We would be using worker threads to calculate Fibonacci of numbers and also making use of Atomics and Shared Buffers to help us handle race conditions between our threads.
Check out this wonderful article about Shared buffers and Atomics here
We can easily use the worker thread module by importing it into our file.
const { Worker } = require('worker_threads');
Main Process
// main.js
const { Worker } = require("worker_threads");
const runFibonnaci = (nums) => {
// get the length of the array
let length = nums.length;
// int32 buffer of each element in the array
let size = Int32Array.BYTES_PER_ELEMENT * length;
// Create buffer for the size ofthe input array
let sharedBuffer = new SharedArrayBuffer(size);
let sharedArray = new Int32Array(sharedBuffer);
for(let i = 0; i < length; i++ ) {
// store each value into the shareArray
Atomics.store(sharedArray, i, nums[i]);
// Spin up a new worker thread
let worker = new Worker('./worker.js');
// Once calculation is done print out result
worker.once('message', (message) => {
console.log('Result received --- ', message);
})
// Send array data and index to worker thread.
worker.postMessage({data: sharedArray, index: i});
}
};
runFibonnaci([50, 20, 21, 24, 4 ]);
The rubFibonnaci
function accepts an array of numbers to be calculated in the worker thread, The sharedBuffer
variable is created using the SharedArrayBuffer
class from the size
variable which creates the size of the sharedArrayBuffer.
// get the length of the array
let length = nums.length;
// int32 buffer of each element in the array
let size = Int32Array.BYTES_PER_ELEMENT * length;
// Create buffer for the size ofthe input array
let sharedBuffer = new SharedArrayBuffer(size);
let sharedArray = new Int32Array(sharedBuffer);
The sharedArray
variable is also created using the int32Array
class to create an array of 32 bit signed integers. We are use Atomics to store our sharedArray
so each worker thread can access the shareArray
variable from a single memory instance, Atomics only works with SharedArrayBuffers and ArrayBuffers.
When memory is shared, multiple threads can read and write the same data in memory. Atomic operations make sure that predictable values are written and read, that operations are finished before the next operation starts and those operations are not interrupted. According to the MDN Docs
We proceed to loop through through the nums
array passed into the runFibonnaci
function, then store each value, using the Atomic.store
static function.
for(let i = 0; i < length; i++ ) {
// store each value into the shareArray
Atomics.store(sharedArray, i, nums[i]);
// Spin up a new worker thread
let worker = new Worker('./worker.js');
// Once calculation is done print out result
worker.once('message', (message) => {
console.log('Result received --- ', message);
})
// Send array data and index to worker thread.
worker.postMessage({data: sharedArray, index: i});
}
We then spin up a new worker thread and send the sharedArray
and the index
into the worker thread. The worker.once('message')
function is called once the worker thread has finished executing its task and returns a value, which we would see in the worker file below.
Worker Process
// worker.js
const { Worker, isMainThread, parentPort } = require('worker_threads');
// Listen for message from main thread
parentPort.once('message', (event) => {
const sharedArray = event.data;
const index = event.index;
const arrValue = Atomics.load(sharedArray, index);
const fibonaciValue = calculateFibonacci(arrValue);
parentPort.postMessage(fibonaciValue);
});
const calculateFibonacci = (num) => {
var a = 1, b = 0, temp;
while (num >= 0){
temp = a;
a = a + b;
b = temp;
num--;
}
return b;
}
The parentPort.once
function is called once the worker is initialized and data is passed into it, it loads the sharedArray
and index and stores it in a variable. the arrValue
fetch the value from the sharedArray
using the Atomics.load function, then calculates the Fibonacci of the value by calling the calculateFibonacci
function, it then returns the value to the main process to be printed on the console.
You can run the code by running this command on the console
node main.js
.
// console
Fibonacci received --- 20365011074
Fibonacci received --- 17711
Fibonacci received --- 75025
Fibonacci received --- 10946
Fibonacci received --- 5
Conclusion
Using worker threads can help your Node.js application by executing tasks that are CPU intensive in threads, worker threads don't magically make your application faster, but it can help in situations where some particular sets of instructions are blocking the single process and making other tasks fail.
Source code can be found here
Photo by K15 Photos on Unsplash
Top comments (1)
What if Array is constantly growing and we need to share array b/w threads for comparison with another value , How much size we should allocate ?
What type we should consider if array has embedding values of faces (each element of array has 1024 floating points numbers ) to represent attributes of face ?