DEV Community

Manoj Kumar Patra
Manoj Kumar Patra

Posted on

Asynchronous Control Flow Patterns

Basic principles that can help us keep the nesting level low and improve the organization of our code in general in callbacks:

  • Exit as soon as possible.
  • Create named functions for callbacks, keeping them out of closures and passing intermediate results as arguments.
  • Modularize the code.

# Sequential execution

Executing a set of tasks in sequence means running them one at a time, one after the other.

Different variations of this flow:

  • Executing a set of known tasks in sequence, without propagating data across them.
  • Using the output of a task as the input for the next (also known as chain, pipeline, or waterfall).
  • Iterating over a collection while running an asynchronous task on each element, one after the other.

Executing a known set of tasks in sequence


function task1 (cb) {
  asyncOperation(() => {
    task2(cb);
  });
}

function task2 (cb) {
  asyncOperation(() => {
    task3(cb);
  });
}

function task3 (cb) {
  asyncOperation(() => {
    cb();
  });
}

task1(() => {
  // executed when task1, task2 and task3 are completed
  console.log('tasks 1, 2 and 3 executed')
});

Enter fullscreen mode Exit fullscreen mode

Sequential iteration

This pattern is helpful when we want to execute an asynchronous operation for each item in a collection.

Pattern with callback


function iterate (index) {
  if (index === tasks.length) {
    return cb();
  }

  const task = tasks[index];
  task(() => {
    iterate(index + 1);
  });
}

function cb () {
  // iteration completed
}

iterate(0);

Enter fullscreen mode Exit fullscreen mode

Pattern with Promise

Promisify a callback based API


function promisify (cbBasdApi) {
  return function promisified (...args) {
    return new Promise((resolve, reject) => {
      const newArgs = [
        ...args,
        function (err, result) {
          if (err) {
            return reject(err);
          }

          resolve(result);
        }
      ];

      cbBasedApi(...newArgs);
    });
  };
}

Enter fullscreen mode Exit fullscreen mode

function iterate(index) {
  let promise = Promise.resolve();

  if (index === tasks.length) {
    return promise.then(finish);
  }

  for (const task of tasks) {
    promise = promise.then(() => iterate(index + 1));
  }

  return promise;
}

function finish() {
  // iteration completed
  console.log('All tasks completed');
}

Enter fullscreen mode Exit fullscreen mode

const promise = tasks.reduce((prev, task) => {
  return prev.then(() => {
    return task();
  });
}, Promise.resolve());

Enter fullscreen mode Exit fullscreen mode

❗ This algorithm becomes recursive if task() is a synchronous operation. There might be a risk of hitting the maximum call stack size limit.

⭐ This pattern can be used to create asynchronous versions of Array methods.


function iterateSeries(collection, iteratorCallback, finalCallback) {
  if (collection.length === 0) {
    finalCallback();
    return;
  }

  const currentItem = collection.shift();

  iteratorCallback(currentItem, () => {
    iterateSeries(collection, iteratorCallback, finalCallback);
  });
}

// Example
const collection = [1, 2, 3, 4, 5];

function iteratorCallback(item, callback) {
  // Simulate asynchronous operation (e.g., API call, file I/O) with setTimeout
  setTimeout(() => {
      console.log(item);
      callback(); // Call the callback to proceed to the next iteration
  }, 1000); // Delay of 1 second for demonstration
}

function finalCallback() {
  console.log("All iterations are complete");
}

iterateSeries(collection, iteratorCallback, finalCallback);

Enter fullscreen mode Exit fullscreen mode

# Parallel execution

In parallel execution, we are not concerned about the order of execution of a set of asynchronous tasks. Instead, all we need is to get notified when all those running tasks are completed.

Pattern with callback


const tasks = [ /* ... */ ];

let completed = 0;
tasks.forEach(task => {
  task(() => {
    if (++completed === tasks.length) {
      finish(); 
    }
  }); 
});

function finish () {
  // all the tasks completed
}

Enter fullscreen mode Exit fullscreen mode

Pattern with Promise


const tasks = [/* ... */];

// Map each task to a Promise that resolves when the task is completed
const promises = tasks.map(task => new Promise(resolve => task(resolve)));

Promise.all(promises)
  .then(() => {
    cb();
  })
  .catch(error => {
    cb(error);
  });

function cb () {
  // all the tasks completed
}

Enter fullscreen mode Exit fullscreen mode

The Unlimited Parallel Execution pattern

The Unlimited Parallel Execution pattern, also known as Parallel Limitless, allows you to execute multiple asynchronous tasks concurrently without any predefined limit on the number of tasks running simultaneously. This pattern is useful when you have a large number of asynchronous operations to perform, such as making multiple API requests, and you want to maximize efficiency by executing as many tasks in parallel as possible without overwhelming the system.


async function unlimitedParallelExecution(tasks, taskExecutor) {
  const promises = [];

  for (const task of tasks) {
    promises.push(taskExecutor(task));
  }

  await Promise.all(promises);
}

// Example:

async function taskExecutor(task) {
  // Simulate asynchronous operation (e.g., API call, file I/O) with setTimeout
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log('Task completed:', task);
      resolve();
    }, Math.random() * 2000); // Random delay up to 2 seconds for demonstration
  });
}

const tasks = ['Task 1', 'Task 2', 'Task 3', 'Task 4', 'Task 5'];

unlimitedParallelExecution(tasks, taskExecutor)
  .then(() => {
    console.log('All tasks completed');
  })
  .catch((error) => {
    console.error('Error:', error);
  });

Enter fullscreen mode Exit fullscreen mode

Fixing race conditions

To avoid race conditions, all we need to do is to introduce a variable of some type to mutually exclude multiple similar tasks.

# Limited parallel execution

A server that spawns unbounded parallel tasks to handle a user request could be exploited with a denial-of-service (DoS) attack.

Limiting the number of parallel tasks is, in general, a good practice that helps with building resilient applications.

Limiting concurrency


const tasks = [/* ... */];

const concurrencyLimit = 3;

async function executeTasksWithLimit(tasks, concurrencyLimit) {
  const runningTasks = [];
  const results = [];

  const runTask = task => {
    task((err, res) => {
      if (err) {
        results.push({ error });
        return;
      }
      results.push(res);
    });

    // Remove the task from the list of running tasks
    runningTasks.splice(runningTasks.indexOf(task), 1);
    // If there are more tasks to run, start the next one
    if (tasks.length > 0) {
      startNextTask();
    }
  };

  const startNextTask = () => {
    const task = tasks.shift(); // Get the next task
    if (task) {
      runningTasks.push(task);
      runTask(task);
    }
  };

  for (let i = 0; i < concurrencyLimit && i < tasks.length; i++) {
    startNextTask();
  }
}

Enter fullscreen mode Exit fullscreen mode

Limited concurrency pattern works very well when we have a predetermined set of tasks to execute, or when the set of tasks grows linearly over time.

Limited concurrency with queues


import { EventEmitter } from 'events';

export class TaskQueue {
  constructor (concurrency) {
    super();
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }

  runTask (task) {
    return new Promise((resolve, reject) => {
      this.queue.push(() => {
        return task().then(resolve, reject);
      });
      process.nextTick(this.next.bind(this));
    });
  }

  next () {
    if (this.running === 0 && this.queue.length === 0) {
      return this.emit("empty");
    }

    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task()
        .catch(err => {
          this.emit("error", err);
        })
        .finally(() => {
          this.running--;
          process.nextTick(this.next.bind(this));
        });
      this.running++;
    }
  }
}

Enter fullscreen mode Exit fullscreen mode
Example of spider web app
Sequential execution and iteration

function spiderLinks (currentUrl, content, nesting, queue) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }

  const links = getPageLinks(currentUrl, content);
  for (const link of links) {
    promise = promise.then(() =>
      spiderTask(link, nesting - 1, queue));
  }

  return promise;
}

Enter fullscreen mode Exit fullscreen mode
Parallel execution

function spiderLinks (currentUrl, content, nesting, queue) {
  if (nesting === 0) {
    return Promise.resolve();
  }

  const links = getPageLinks(currentUrl, content);
  const promises = links
    .map(link => spiderTask(link, nesting - 1, queue));
  return Promise.all(promises);
}

Enter fullscreen mode Exit fullscreen mode

const spidering = new Set();

function spiderTask (url, nesting, queue) {
  if (spidering.has(url)) {
    return Promise.resolve();
  }

  spidering.add(url);

  const fileName = urlToFilename(url);

  return queue
    .runTask(() => {
      return fsPromises.readFile(filename, 'utf8')
        .catch(err => {
          if (err.code !== 'ENOENT') {
            throw err;
          }

          return download(url, fileName);
        });
    })
    .then(content => spiderLinks(url, content, nesting, queue));
}

export function spider (url, nesting, concurrency) {
  const queue = new TaskQueue(concurrency);
  return spiderTask(url, nesting, queue);
}

Enter fullscreen mode Exit fullscreen mode

Refresher on Promises

new Promise((resolve, reject) => {}) creates a new Promise instance that fulfills or rejects based on the behavior of the function provided as an argument.

Useful Promise methods:

  1. Promise.resolve(obj)
  2. Promise.reject(err)
  3. Promise.all(iterable)
  4. Promise.allSettled(iterable) => returns an object with structure

    
    {
      status: 'fulfilled' | 'rejected';
      reason?: any; // present in case of rejection
      value?: any; // present in case of fulfillment
    }
    
    
  5. Promise.race(iterable)

  6. promise.then(onFulfilled, onRejected)

  7. promise.catch(onRejected)

  8. promise.finally(onFinally)

Refresher on async/await

In an async function, at each await expression, the execution of the function is put on hold, its state saved, and the control returned to the event loop. Once the Promise that has been awaited resolves, the control is given back to the async function, returning the fulfilment value of the Promise.

❗❗ BEWARE


links.forEach(async function iteration(link) {
  await spider(link, nesting - 1);
});

Enter fullscreen mode Exit fullscreen mode

This piece of code will start the tasks in parallel instead of sequential iteration and the execution continues immediately after invoking forEach(), without waiting for all the tasks to complete.

Limited parallel execution with async/await


export class TaskQueuePC {
  constructor (concurrency) {
    this.taskQueue = [];
    this.consumerQueue = [];

    // Spawn as many consumers as the concurrency we want to attain
    for (let i = 0; i < concurrency; i++) {
      this.consumer();
    }
  }

  async consumer () {
    while (true) {
      try {
        const task = await this.getNextTask();
        await task();
      } catch (err) {
        console.error(err);
      }
    }
  }

  async getNextTask () {
    return new Promise((resolve) => {
      if (this.taskQueue.length !== 0) {
        return resolve(this.taskQueue.shift());
      }

      // If queue is empty, then we postpone the resolution of the Promise by queuing the resolve callback into the consumerQueue.
      this.consumerQueue.push(resolve);
    });
  }

  runTask (task) {
    return new Promise((resolve, reject) => {
      const taskWrapper = () => {
        const taskPromise = task();
        taskPromise.then(resolve, reject);
        return taskPromise;
      };

      // If the consumerQueue is not empty, then there is at least one consumer that is asleep, waiting for a new task to run.
      if (this.consumerQueue.length !== 0) {
        const consumer = this.consumerQueue.shift();
        consumer(taskWrapper);
      // If the consumerQueue is empty, means all the consumers are busy
      } else {
        this.taskQueue.push(taskWrappeer);
      }
    });
  }
}

Enter fullscreen mode Exit fullscreen mode

# Tackling a problem of memory leak associated with Promises

Consider the following code snippets:


function leakingLoop () {
  return delay(1)
    .then(() => {
      console.log(`Tick ${Date.now()}`);
      return leakingLoop();
    });
}

async function leakingLoopAsync () {
  await delay(1);
  console.log(`Tick ${Date.now()}`);
  return leakingLoopAsync();
}

Enter fullscreen mode Exit fullscreen mode

This situation creates a chain of promises that never settle, and it will cause a memory leak in Promise implementations.

The solution is to break the chain of Promise resolution while considering error handling from anywehere deep in the recursion as follows:


function nonLeakingLoop () {
  return new Promise((resolve, reject) => {
    (function internalLoop () {
      delay(1)
        .then(() => {
          console.log(`Tick ${Date.now()}`);
          internalLoop();
        })
        .catch(err => {
          reject(err);
        });
    })();
  });
}

Enter fullscreen mode Exit fullscreen mode

Top comments (0)