DEV Community

Genne23v
Genne23v

Posted on

Create BullMQ Workflow

I think the one of the most challenging work for Starchart app is that we need to support long asynchronous job to complete. The app adds a record to DB and create a record in Route53. Then it needs to get a HTTPS certificate. It takes at least a few minutes to complete this process. There were multiple technical approaches to solve the problem. We chose to use BullMQ queue system. I hadn't tried any of queue systems before. It was a good opportunity to learn how to use BullMQ, especially Flows.

BullMQ setup

Before using BullMQ, a couple of dependencies are required.

npm i bullmq ioredis
Enter fullscreen mode Exit fullscreen mode

Also you will need a redis docker container to test in your local. Here's the Redis Docker image link.

BullMQ Flows

I used BullMQ Flows as it suits our needs that the jobs should be run sequentially. What needs to happens is...

  1. Add a record to MySQL database
  2. Create a record in Route53
  3. Continuously check Route53 whether it's deployed successfully or fails. It takes about 1 minutes according to AWS
  4. Update the Route53 sync status in DB

Let's write a flow for these jobs.

const flowProducer = new FlowProducer({ connection: redis });

export const addDnsRequest = async ({ username, type, name, value }: JobRecord) => {
  // Step 1. Create a record in MySQL for a domain with pending status
  const addDbRecord: FlowJob = {
    name: `addDbRecord:${name}-${username}`,   //Set up unique name for each job
    queueName: 'add-db-record',      //Set a queue name. It will be used as a part of job name unless you configure it manually
    data: { username, type, name, value } as JobRecord,
    opts: {
      failParentOnFailure: true,    //If any one of the job fails, the whole flow will fail
      attempts: 5,    //Retry 5 times
      backoff: {      //Wait for 15 seconds when retrying. Each retry wait will be exponentially increased.
        type: 'exponential',
        delay: 15_000,
      },
    },
  };

  // Step 2. Request Route53 to create a record
  const createDnsRecord: FlowJob = {
    name: `createDnsRecord:${name}-${username}`,
    queueName: create-dns-record,
    data: { username, type, name, value } as JobRecord,
    opts: {
      failParentOnFailure: true,
      attempts: 5,
      backoff: {
        type: 'exponential',
        delay: 15_000,
      },
    },
  };

  // Step 3. Poll Route53 to check connection status of the domain until it's ready
  const checkDnsStatus: FlowJob = {
    name: `checkDnsStatus:${name}-${username}`,
    queueName: checkDnsStatusQueueName,
    children: [createDnsRecord],    //With this dependency, parent job will not move to queue until child job is processed
    opts: {
      failParentOnFailure: true,
      attempts: 5,
      backoff: {
        type: 'exponential',
        delay: 60_000,
      },
    },
  };

  // Step 4. Update the MySQL record with the active or error status
  const syncDbStatus: FlowJob = {
    name: `syncDbStatus:${name}-${username}`,
    queueName: syncDbStatusQueueName,
    children: [addDbRecord, checkDnsStatus],    //This job needs two children jobs to be completed
    opts: {
      failParentOnFailure: true,
      attempts: 5,
      backoff: {
        type: 'exponential',
        delay: 30_000,
      },
    },
  };

  return await flowProducer.add(syncDbStatus);    //All of jobs are added to the flow by adding the last parent process
Enter fullscreen mode Exit fullscreen mode

Set up workers for each queue

Now we need to set up a Worker to execute these jobs. Since most of the workers are similar, I will just add the most useful example.

export const createDnsRecordWorker = new Worker<JobRecord>(
  'create-dns-record',    //Define a queue for the worker
  async (job) => {
    const { username, type, name, value } = job.data;    //Get data fed from the queue. 

    try {
      return createRecord(username, type, name, value);
    } catch (error) {
      logger.warn('Could not create a record in Route53', error);
      throw error;
    }
  },
  { connection: redis }
);
Enter fullscreen mode Exit fullscreen mode

You can customize your worker based on your needs. When the whole flow should fail due to one of the process is failed, you need to throw UnrecoverableError.
Also you can capture values from multiple children jobs as below.

const values: { [jobKey: string]: string } = await job.getChildrenValues();
const key = Object.keys(values)[0];
const value = values[key];
Enter fullscreen mode Exit fullscreen mode

Conclusion

I haven't had an experience to use queue system. By using BullMQ, we are able to solve many problems such as sending scheduled emails or notification email, creating HTTPS certificate, etc. I'm sure there are many problems that can be solved with queue system. I hope this article is useful to start writing your work flow.

Top comments (1)

Collapse
 
denhelsing profile image
Denys Kryvoshei • Edited

Nice article, thanks.

I guess there's a typo in addDnsRequest function, your return statement is out of the scope, should be smth like:

const flowProducer = new FlowProducer({ connection: redis });

export const addDnsRequest = async ({ username, type, name, value }: JobRecord) => {
  ................
  // Step 4. Update the MySQL record with the active or error status
  const syncDbStatus: FlowJob = {
    name: `syncDbStatus:${name}-${username}`,
    ...........
  };
  return await flowProducer.add(syncDbStatus);  
};
Enter fullscreen mode Exit fullscreen mode