DEV Community

Connie Leung
Connie Leung

Posted on • Edited on

Queuing jobs in NestJS using @nestjs/bullmq package

Introduction

Both Bull and BullMQ are queue libraries that persist jobs in Redis. Bull is in maintenance mode and maintainers only fix bugs in the library. The new version of the library, BullMQ, is rewritten in TypeScript. Bull and BullMQ are similar except BullMQ introduces flow producer that can split a resource-intensive job into children jobs, and parent job processes the result of the children jobs when they are all completed. Queuing jobs in NestJS has the option to upgrade to BullMQ and nestjs/bullmq in version 10.

NestJS has a @nestjs/bullmq package Github link but the official documentation of NestJS does not have example of @nestjs/bullmq and bullmq.

Install dependencies

npm i --save @nestjs/bullmq bullmq
Enter fullscreen mode Exit fullscreen mode

Register Bull Module

First, I used BullModule.forRoot to register BullMQ and connect the package to a Redis instance. Second, I imported the BullModule to the imports array of AppModule. The Redis host and port are the default values that are localhost and 6379 respectively.

connection: {
    host: 'localhost',
    port: 6379,
 },
Enter fullscreen mode Exit fullscreen mode
// queue.config.ts

import { BullModule } from '@nestjs/bullmq';

export const queueConfig = BullModule.forRoot({
  connection: {
    host: 'localhost',
    port: 6379,
  },
  defaultJobOptions: {
    removeOnComplete: 1000,
    removeOnFail: 5000,
    attempts: 3,
  },
});
Enter fullscreen mode Exit fullscreen mode
// app.module.ts
import { Module } from '@nestjs/common';
import { queueConfig } from './config/queue.config';

@Module({
  imports: [queueConfig],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Register BullMQ queues and Flow Producer with module

In this demo, I registered 4 BullMQ queues and 1 flow producer in a custom queue module. BullModule.registerQueue({ name }) registers a regular queue named name whereas BullModule.registerFlowProducer registers a flow producer named flow.

// queue-board.interface.ts

export interface QueueBoardModuleOptions {
  queues: string[];
  flows?: string[];
}
Enter fullscreen mode Exit fullscreen mode
// queue-board.module-definition.ts

import { ConfigurableModuleBuilder } from '@nestjs/common';
import { QueueBoardModuleOptions } from './queue-board.interface.';

export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN, OPTIONS_TYPE } =
  new ConfigurableModuleBuilder<QueueBoardModuleOptions>().build();
Enter fullscreen mode Exit fullscreen mode
// queue-board.module.ts

@Module({})
export class QueueModule extends ConfigurableModuleClass {
  static register(options: typeof OPTIONS_TYPE): DynamicModule {
    const bullModules = options.queues.map((name) => BullModule.registerQueue({ name }));

    const flowProducers = (options.flows || []).map((flow) =>
      BullModule.registerFlowProducer({
        name: flow,
      }),
    );

    return {
      ...super.register(options),
      imports: [...bullModules, ...flowProducers],
      exports: [...bullModules, ...flowProducers],
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

Register QueueModule in MathModule
QueueModule registers 4 queues, MATH_BINARY, MATH_UNARY, MATH_ARRAY_CHILD and MATH_ARRAY_MERGE, and 1 flow producer MATH_ARRAY_PRODUCER in BullModule in @nestjs/bullmq package.

// math.module.ts

@Module({
  imports: [
    QueueModule.register({
      queues: [MATH_BINARY, MATH_UNARY, MATH_ARRAY_CHILD, MATH_ARRAY_MERGE],
      flows: [MATH_ARRAY_PRODUCER],
    }),
  ],
  providers: [
    MathBinaryOperationProcessor,
    MathUnaryOperationPocessor,
    MathArrayChildProcessor,
    MathArrayMergeProcessor,
    ArrayFlowService,
  ],
  controllers: [MathController, MathArrayController],
})
export class MathModule {}
Import MathModule into AppModule
// app.module.ts
import { Module } from '@nestjs/common';
import { queueConfig } from './config/queue.config';
import { MathModule } from './math/math.module';
Enter fullscreen mode Exit fullscreen mode
@Module({
  imports: [queueConfig, MathModule],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Queuing jobs in NestJS and BullMQ
The codes of queuing jobs in NestJS is the same regardless nestjs/bull or nestjs/bullmq is used. Use InjectQueue decorator to inject a queue and add a job with data. The new job has a job id that the controller method can return to client. Client can use that job id to check status and obtain job value.

// inject-queue.decorator.ts
import { InjectQueue } from '@nestjs/bullmq';
import { MATH_BINARY, MATH_UNARY } from '../constants/math.constant';

export const InjectMathBinaryQueue = () => InjectQueue(MATH_BINARY);
export const InjectMathUnaryQueue = () => InjectQueue(MATH_UNARY);
Enter fullscreen mode Exit fullscreen mode
// math.controller.ts

@Controller('math')
export class MathController {
  constructor(@InjectMathBinaryQueue() private mathBinaryQueue: Queue) {}

  @Post('sum')
  async sum(@Body() dto: BinaryOperationDto): Promise<string> {
    const job = await this.mathBinaryQueue.add(MATH_BINARY_OPS.SUM, dto);
    return job.id || '';
  }
}
Enter fullscreen mode Exit fullscreen mode

Create BullMQ processor
Bullmq does not have @process() decorator; therefore, it handles named job differently in processor class. In Bullmq processor, the process method uses a switch construct to compare job name and invoke the corresponding job function.

// worker-host.process.ts

import { OnWorkerEvent, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';

export abstract class WorkerHostProcessor extends WorkerHost {
  protected readonly logger = new Logger(WorkerHostProcessor.name);

  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    const { id, name, queueName, finishedOn, returnvalue } = job;
    const completionTime = finishedOn ? new Date(finishedOn).toISOString() : '';
    this.logger.log(
      `Job id: ${id}, name: ${name} completed in queue ${queueName} on ${completionTime}. Result: ${returnvalue}`,
    );
  }

  @OnWorkerEvent('progress')
  onProgress(job: Job) {
    const { id, name, progress } = job;
    this.logger.log(`Job id: ${id}, name: ${name} completes ${progress}%`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job) {
    const { id, name, queueName, failedReason } = job;
    this.logger.error(`Job id: ${id}, name: ${name} failed in queue ${queueName}. Failed reason: ${failedReason}`);
  }

  @OnWorkerEvent('active')
  onActive(job: Job) {
    const { id, name, queueName, timestamp } = job;
    const startTime = timestamp ? new Date(timestamp).toISOString() : '';
    this.logger.log(`Job id: ${id}, name: ${name} starts in queue ${queueName} on ${startTime}.`);
  }
}
Enter fullscreen mode Exit fullscreen mode

WorkerHostProcessor extends WorkerHost and overrides @OnWorkEvent('completed'), @OnWorkerEvent('progress') and @OnWorkerEvent('active') to log the completion, progress and activation of a job. Other processors can extend WorkerHostProcessor to avoid writing boilerplate @OnWorkerEvent.

// binary-operation.dto

import { IsNumber } from 'class-validator';

export class BinaryOperationDto {
  @IsNumber()
  num: number;

  @IsNumber()
  num2: number;
}
Enter fullscreen mode Exit fullscreen mode
// math-binary-ops.enum.ts

export enum MATH_BINARY_OPS {
  SUM = 'SUM',
  SUBTRACT = 'SUBTRACT',
  MULTIPLY = 'MULTIPLY',
  DIVISION = 'DIVISION',
}
Enter fullscreen mode Exit fullscreen mode
// math-binary-operation.processor.ts

import { Processor } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { MATH_BINARY } from '../constants/math.constant';
import { WorkerHostProcessor } from './worker-host.processor';

@Processor(MATH_BINARY)
@Injectable()
export class MathBinaryOperationProcessor extends WorkerHostProcessor {
  process(job: Job<BinaryOperationDto, number, string>): Promise<number> {
    const { num, num2 } = job.data;
    switch (job.name) {
      case MATH_BINARY_OPS.SUM:
        return Promise.resolve(num + num2);
    }
    throw new BadRequestException(`Unknown job name: ${job.name}`);
  }
}
Enter fullscreen mode Exit fullscreen mode

@processor(MATH_BINARY) ensures that the processor takes jobs from MATH_BINARY queue to complete. In process method, the switch block matches job name to MATH_BINARY_OPS.SUM and returns the sum of 2 number inputs.

Then, import MathBinaryOperationProcessor in the providers array of MathModule

// math.module.ts

@Module({
  ...
  providers: [
    MathBinaryOperationProcessor,
  ],
  controllers: [MathController],
})
export class MathModule {}
Enter fullscreen mode Exit fullscreen mode

Test BullMq
I made a HTTP request to sum 2 numbers together and the response is the job id

curl --location 'http://localhost:3000/math/sum' \
--header 'Content-Type: application/json' \
--data '{
    "num": 4,
    "num2": 3
}'
5
Enter fullscreen mode Exit fullscreen mode

In the terminal, the job completed and logged the answer 7.

[Nest] 33415 - 12/17/2023, 10:19:15 PM LOG [WorkerHostProcessor] Job id: 5, name: SUM starts in queue math-binary on 2023-12-17T14:19:15.168Z.
[Nest] 33415 - 12/17/2023, 10:19:15 PM LOG [WorkerHostProcessor] Job id: 5, name: SUM completed in queue math-binary on 2023-12-17T14:19:15.736Z. Result: 7

Flow Producer
This is a new feature of BullMQ. The idea of flow producer is to split a heavy job into one or more smaller jobs to process. When all the smaller jobs finish, the results are available to the parent job to combine them into the final result

Register a flow producer

BullModule.registerFlowProducer({ name: flow, })
Enter fullscreen mode Exit fullscreen mode

Inject flow producer
In createFlow method, I split a number array into sub-arrays. Then, I created a child job for each sub-array to run. When these children jobs finish, the parent job in the parent queue, MATH_ARRAY_MERGE, executes to produce the final result.

// array-flow.service.ts

const PARTITION_SIZE = 4;

@Injectable()
export class ArrayFlowService {
  constructor(@InjectMathArrayProducer() private mathFlowProducer: FlowProducer) {}

  async createFlow(dto: ArrayOperationDto, jobName: MATH_ARRAY_OPS): Promise<string> {
    const flow = await this.mathFlowProducer.add({
      name: jobName,
      queueName: MATH_ARRAY_MERGE,
      children: this.createChildrenJobs(dto, jobName),
    });
    return flow.job.id || '';
  }

  private createChildrenJobs(dto: ArrayOperationDto, jobName: MATH_ARRAY_OPS) {
    const numPartitions = Math.ceil(dto.data.length / PARTITION_SIZE);
    let startIdx = 0;

    const children: FlowChildJob[] = [];
    for (let i = 0; i < numPartitions - 1; i++) {
      children.push({
        name: jobName,
        data: {
          data: dto.data.slice(startIdx, startIdx + PARTITION_SIZE),
          percentage: (100 / numPartitions) * (i + 1),
        },
        queueName: MATH_ARRAY_CHILD,
      });
      startIdx = startIdx + PARTITION_SIZE;
    }

    children.push({
      name: jobName,
      data: { data: dto.data.slice(startIdx), percentage: 100 },
      queueName: MATH_ARRAY_CHILD,
    });

    return children;
  }
}
Enter fullscreen mode Exit fullscreen mode

Child processor
The child processor takes jobs from the MATH_ARRAY_CHILD queue to process. When job name is MATH_ARRAY_OPS.MIN, it returns the minimum element of an arbitrary array. When job name is MATH_ARRAY_OPS.MAX, it returns the maximum element of an array. MATH_ARRAY_OPS.FILTER_ODD job filters odd numbers and MATH_ARRAY_OPS.FILTER_EVEN job filters even numbers.

// math-array-child.processor.ts

@Processor(MATH_ARRAY_CHILD)
@Injectable()
export class MathArrayChildProcessor extends WorkerHostProcessor {
  async process(job: Job<ComparisonJobProgress, number | number[], string>): Promise<number | number[]> {
    switch (job.name) {
      case MATH_ARRAY_OPS.MIN:
        return Math.min(...job.data.data);
      case MATH_ARRAY_OPS.MAX:
        const maxResult = Math.max(...job.data.data);
      case MATH_ARRAY_OPS.FILTER_ODD:
        return job.data.data.filter((n) => n % 2 === 1);
      case MATH_ARRAY_OPS.FILTER_EVEN:
        return job.data.data.filter((n) => n % 2 === 0);
    }

    throw new BadRequestException(`Unknown job name ${job.name} found in queue ${job.queueName}`);
  }
}
Enter fullscreen mode Exit fullscreen mode

Producer processor
When all children jobs complete successfully, parent job in the MATH_ARRAY_MERGE queue receives the children values through Object.values(await job.getChildrenValues()). Then, the parent job invokes more functions on the children values to yield the final result.

// math-array-merge.process.ts

@Processor(MATH_ARRAY_MERGE)
@Injectable()
export class MathArrayMergeProcessor extends WorkerHostProcessor {
  async process(job: Job<ArrayOperationDto, number | number[], string>): Promise<number | number[]> {
    const results = Object.values(await job.getChildrenValues());
    switch (job.name) {
      case MATH_ARRAY_OPS.MIN:
        return Math.min(...results);
      case MATH_ARRAY_OPS.MAX:
        return Math.max(...results);
      case MATH_ARRAY_OPS.FILTER_ODD:
      case MATH_ARRAY_OPS.FILTER_EVEN:
        return (results as number[][]).flat();
    }

    throw new BadRequestException(`Unknown job name ${job.name}`);
  }
}
Enter fullscreen mode Exit fullscreen mode

Controller
When user makes a request to find the minimum/maximum number in an array, a new flow is appended to the flow producer

// math-array.controller.ts

@Controller('math-array')
export class MathArrayController {
  constructor(private arrayFlowService: ArrayFlowService) {}

  @Post('min')
  async findMin(@Body() dto: ArrayOperationDto): Promise<string> {
    return this.arrayFlowService.createFlow(dto, MATH_ARRAY_OPS.MIN);
  }

  @Post('max')
  async findMax(@Body() dto: ArrayOperationDto): Promise<string> {
    return this.arrayFlowService.createFlow(dto, MATH_ARRAY_OPS.MAX);
  }
}
Enter fullscreen mode Exit fullscreen mode

Test Flow Producer
I made a HTTP request to find the max element of a number array

curl --location 'http://localhost:3000/math-array/max' \
--header 'Content-Type: application/json' \
--data '{
    "data": [1,2,5,-3, 90, 77, -900, 700, 300, 999, -1000, 1099, -2000]
}'
Enter fullscreen mode Exit fullscreen mode

The response is a UUID job id

fecd7af4-31b0-4add-a716-4dd595794332
Enter fullscreen mode Exit fullscreen mode

The flow producer splits the number array into 4 children jobs.

Children job 1 processes [1,2,5,-3]
Children job 2 processes [90, 77, -900, 700]
Children job 3 processes [300, 999, -1000, 1099]
Children job 4 processes [-2000]
Children jobs return 5, 700, 1099 and -2000 to the parent job respectively. The parent job invokes Math.max on [5, 700, 1099, -2000] and the final value is 1099.

[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: a1625cf4-15c8-4a9a-bc6f-a89a49f085a2, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.606Z. Result: 5
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: ed5c95aa-1854-4aa0-92cd-13b64ddf3dd1, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.613Z. Result: 700
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: f52ed2b2-ee11-46a5-963a-d15023564ed8, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.626Z. Result: 1099
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: 92aa6a17-c6f4-4a07-87d1-7b19054fbc51, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.647Z. Result: -2000
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: fecd7af4-31b0-4add-a716-4dd595794332, name: MAX completed in queue math-array-merge on 2023-12-17T15:45:39.872Z. Result: 1099
The log in the terminal also describes the same steps to land the final value, 1099.
Enter fullscreen mode Exit fullscreen mode

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.

Resources:

Top comments (3)

Collapse
 
bjanaszek profile image
Brian-Janaszek-Clario

Is it possible to inject a provider into the extended WorkerHost class? I'm attempting to use the "typical" way of doing DI by adding a constructor with the provider injected as an argument, but the provider instance is undefined when I attempt to use, and my constructor doesn't seem to be called.

Collapse
 
dami2 profile image
dami

Some code blocks look broken, maybe it's a malformed markdown?

Collapse
 
railsstudent profile image
Connie Leung

Fixed. Thanks.