Basic principles that can help us keep the nesting level low and improve the organization of our code in general in callbacks:
- Exit as soon as possible.
- Create named functions for callbacks, keeping them out of closures and passing intermediate results as arguments.
- Modularize the code.
# Sequential execution
Executing a set of tasks in sequence means running them one at a time, one after the other.
Different variations of this flow:
- Executing a set of known tasks in sequence, without propagating data across them.
- Using the output of a task as the input for the next (also known as chain, pipeline, or waterfall).
- Iterating over a collection while running an asynchronous task on each element, one after the other.
Executing a known set of tasks in sequence
function task1 (cb) {
asyncOperation(() => {
task2(cb);
});
}
function task2 (cb) {
asyncOperation(() => {
task3(cb);
});
}
function task3 (cb) {
asyncOperation(() => {
cb();
});
}
task1(() => {
// executed when task1, task2 and task3 are completed
console.log('tasks 1, 2 and 3 executed')
});
Sequential iteration
This pattern is helpful when we want to execute an asynchronous operation for each item in a collection.
Pattern with callback
function iterate (index) {
if (index === tasks.length) {
return cb();
}
const task = tasks[index];
task(() => {
iterate(index + 1);
});
}
function cb () {
// iteration completed
}
iterate(0);
Pattern with Promise
Promisify a callback based API
function promisify (cbBasdApi) {
return function promisified (...args) {
return new Promise((resolve, reject) => {
const newArgs = [
...args,
function (err, result) {
if (err) {
return reject(err);
}
resolve(result);
}
];
cbBasedApi(...newArgs);
});
};
}
function iterate(index) {
let promise = Promise.resolve();
if (index === tasks.length) {
return promise.then(finish);
}
for (const task of tasks) {
promise = promise.then(() => iterate(index + 1));
}
return promise;
}
function finish() {
// iteration completed
console.log('All tasks completed');
}
const promise = tasks.reduce((prev, task) => {
return prev.then(() => {
return task();
});
}, Promise.resolve());
❗ This algorithm becomes recursive if task()
is a synchronous operation. There might be a risk of hitting the maximum call stack size limit.
⭐ This pattern can be used to create asynchronous versions of Array methods.
function iterateSeries(collection, iteratorCallback, finalCallback) {
if (collection.length === 0) {
finalCallback();
return;
}
const currentItem = collection.shift();
iteratorCallback(currentItem, () => {
iterateSeries(collection, iteratorCallback, finalCallback);
});
}
// Example
const collection = [1, 2, 3, 4, 5];
function iteratorCallback(item, callback) {
// Simulate asynchronous operation (e.g., API call, file I/O) with setTimeout
setTimeout(() => {
console.log(item);
callback(); // Call the callback to proceed to the next iteration
}, 1000); // Delay of 1 second for demonstration
}
function finalCallback() {
console.log("All iterations are complete");
}
iterateSeries(collection, iteratorCallback, finalCallback);
# Parallel execution
In parallel execution, we are not concerned about the order of execution of a set of asynchronous tasks. Instead, all we need is to get notified when all those running tasks are completed.
Pattern with callback
const tasks = [ /* ... */ ];
let completed = 0;
tasks.forEach(task => {
task(() => {
if (++completed === tasks.length) {
finish();
}
});
});
function finish () {
// all the tasks completed
}
Pattern with Promise
const tasks = [/* ... */];
// Map each task to a Promise that resolves when the task is completed
const promises = tasks.map(task => new Promise(resolve => task(resolve)));
Promise.all(promises)
.then(() => {
cb();
})
.catch(error => {
cb(error);
});
function cb () {
// all the tasks completed
}
The Unlimited Parallel Execution pattern
The Unlimited Parallel Execution pattern, also known as Parallel Limitless, allows you to execute multiple asynchronous tasks concurrently without any predefined limit on the number of tasks running simultaneously. This pattern is useful when you have a large number of asynchronous operations to perform, such as making multiple API requests, and you want to maximize efficiency by executing as many tasks in parallel as possible without overwhelming the system.
async function unlimitedParallelExecution(tasks, taskExecutor) {
const promises = [];
for (const task of tasks) {
promises.push(taskExecutor(task));
}
await Promise.all(promises);
}
// Example:
async function taskExecutor(task) {
// Simulate asynchronous operation (e.g., API call, file I/O) with setTimeout
return new Promise((resolve) => {
setTimeout(() => {
console.log('Task completed:', task);
resolve();
}, Math.random() * 2000); // Random delay up to 2 seconds for demonstration
});
}
const tasks = ['Task 1', 'Task 2', 'Task 3', 'Task 4', 'Task 5'];
unlimitedParallelExecution(tasks, taskExecutor)
.then(() => {
console.log('All tasks completed');
})
.catch((error) => {
console.error('Error:', error);
});
Fixing race conditions
To avoid race conditions, all we need to do is to introduce a variable of some type to mutually exclude multiple similar tasks.
# Limited parallel execution
A server that spawns unbounded parallel tasks to handle a user request could be exploited with a denial-of-service (DoS) attack.
Limiting the number of parallel tasks is, in general, a good practice that helps with building resilient applications.
Limiting concurrency
const tasks = [/* ... */];
const concurrencyLimit = 3;
async function executeTasksWithLimit(tasks, concurrencyLimit) {
const runningTasks = [];
const results = [];
const runTask = task => {
task((err, res) => {
if (err) {
results.push({ error });
return;
}
results.push(res);
});
// Remove the task from the list of running tasks
runningTasks.splice(runningTasks.indexOf(task), 1);
// If there are more tasks to run, start the next one
if (tasks.length > 0) {
startNextTask();
}
};
const startNextTask = () => {
const task = tasks.shift(); // Get the next task
if (task) {
runningTasks.push(task);
runTask(task);
}
};
for (let i = 0; i < concurrencyLimit && i < tasks.length; i++) {
startNextTask();
}
}
Limited concurrency pattern works very well when we have a predetermined set of tasks to execute, or when the set of tasks grows linearly over time.
Limited concurrency with queues
import { EventEmitter } from 'events';
export class TaskQueue {
constructor (concurrency) {
super();
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
runTask (task) {
return new Promise((resolve, reject) => {
this.queue.push(() => {
return task().then(resolve, reject);
});
process.nextTick(this.next.bind(this));
});
}
next () {
if (this.running === 0 && this.queue.length === 0) {
return this.emit("empty");
}
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift();
task()
.catch(err => {
this.emit("error", err);
})
.finally(() => {
this.running--;
process.nextTick(this.next.bind(this));
});
this.running++;
}
}
}
Example of spider web app
Sequential execution and iteration
function spiderLinks (currentUrl, content, nesting, queue) {
let promise = Promise.resolve();
if (nesting === 0) {
return promise;
}
const links = getPageLinks(currentUrl, content);
for (const link of links) {
promise = promise.then(() =>
spiderTask(link, nesting - 1, queue));
}
return promise;
}
Parallel execution
function spiderLinks (currentUrl, content, nesting, queue) {
if (nesting === 0) {
return Promise.resolve();
}
const links = getPageLinks(currentUrl, content);
const promises = links
.map(link => spiderTask(link, nesting - 1, queue));
return Promise.all(promises);
}
const spidering = new Set();
function spiderTask (url, nesting, queue) {
if (spidering.has(url)) {
return Promise.resolve();
}
spidering.add(url);
const fileName = urlToFilename(url);
return queue
.runTask(() => {
return fsPromises.readFile(filename, 'utf8')
.catch(err => {
if (err.code !== 'ENOENT') {
throw err;
}
return download(url, fileName);
});
})
.then(content => spiderLinks(url, content, nesting, queue));
}
export function spider (url, nesting, concurrency) {
const queue = new TaskQueue(concurrency);
return spiderTask(url, nesting, queue);
}
Refresher on Promises
new Promise((resolve, reject) => {})
creates a new Promise instance that fulfills or rejects based on the behavior of the function provided as an argument.
Useful Promise methods:
Promise.resolve(obj)
Promise.reject(err)
Promise.all(iterable)
-
Promise.allSettled(iterable)
=> returns an object with structure
{ status: 'fulfilled' | 'rejected'; reason?: any; // present in case of rejection value?: any; // present in case of fulfillment }
Promise.race(iterable)
promise.then(onFulfilled, onRejected)
promise.catch(onRejected)
promise.finally(onFinally)
Refresher on async/await
In an async
function, at each await
expression, the execution of the function is put on hold, its state saved, and the control returned to the event loop. Once the Promise that has been awaited resolves, the control is given back to the async function, returning the fulfilment value of the Promise.
❗❗ BEWARE
links.forEach(async function iteration(link) {
await spider(link, nesting - 1);
});
This piece of code will start the tasks in parallel instead of sequential iteration and the execution continues immediately after invoking forEach()
, without waiting for all the tasks to complete.
Limited parallel execution with async/await
export class TaskQueuePC {
constructor (concurrency) {
this.taskQueue = [];
this.consumerQueue = [];
// Spawn as many consumers as the concurrency we want to attain
for (let i = 0; i < concurrency; i++) {
this.consumer();
}
}
async consumer () {
while (true) {
try {
const task = await this.getNextTask();
await task();
} catch (err) {
console.error(err);
}
}
}
async getNextTask () {
return new Promise((resolve) => {
if (this.taskQueue.length !== 0) {
return resolve(this.taskQueue.shift());
}
// If queue is empty, then we postpone the resolution of the Promise by queuing the resolve callback into the consumerQueue.
this.consumerQueue.push(resolve);
});
}
runTask (task) {
return new Promise((resolve, reject) => {
const taskWrapper = () => {
const taskPromise = task();
taskPromise.then(resolve, reject);
return taskPromise;
};
// If the consumerQueue is not empty, then there is at least one consumer that is asleep, waiting for a new task to run.
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(taskWrapper);
// If the consumerQueue is empty, means all the consumers are busy
} else {
this.taskQueue.push(taskWrappeer);
}
});
}
}
# Tackling a problem of memory leak associated with Promises
Consider the following code snippets:
function leakingLoop () {
return delay(1)
.then(() => {
console.log(`Tick ${Date.now()}`);
return leakingLoop();
});
}
async function leakingLoopAsync () {
await delay(1);
console.log(`Tick ${Date.now()}`);
return leakingLoopAsync();
}
This situation creates a chain of promises that never settle, and it will cause a memory leak in Promise implementations.
The solution is to break the chain of Promise resolution while considering error handling from anywehere deep in the recursion as follows:
function nonLeakingLoop () {
return new Promise((resolve, reject) => {
(function internalLoop () {
delay(1)
.then(() => {
console.log(`Tick ${Date.now()}`);
internalLoop();
})
.catch(err => {
reject(err);
});
})();
});
}
Top comments (0)