Introduction
Both Bull
and BullMQ
are queue libraries that persist jobs in Redis. Bull
is in maintenance mode and maintainers only fix bugs in the library. The new version of the library, BullMQ
, is rewritten in TypeScript. Bull
and BullMQ
are similar except BullMQ
introduces flow producer that can split a resource-intensive job into children jobs, and parent job processes the result of the children jobs when they are all completed. Queuing jobs in NestJS has the option to upgrade to BullMQ
and nestjs/bullmq
in version 10.
NestJS has a @nestjs/bullmq
package Github link but the official documentation of NestJS does not have example of @nestjs/bullmq
and bullmq
.
Install dependencies
npm i --save @nestjs/bullmq bullmq
Docker Compose
First, I created a docker-compose.yaml and pulled the latest image of Redis. The redis instance mapped port 6379 of the container to the host's 6379 port.
// docker-compose.yaml
version: '3'
services:
redis2:
image: redis:latest
container_name: redis2
ports:
- '6379:6379'
expose:
- '6379'
networks:
- redis
networks:
redis:
driver: bridge
Register Bull Module
First, I used BullModule.forRoot
to register BullMQ and connect the package to a Redis instance. Second, I imported the BullModule
to the imports array of AppModule
. The Redis host and port are the default values that are localhost and 6379 respectively.
connection: {
host: 'localhost',
port: 6379,
},
// queue.config.ts
import { BullModule } from '@nestjs/bullmq';
export const queueConfig = BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 5000,
attempts: 3,
},
});
// app.module.ts
import { Module } from '@nestjs/common';
import { queueConfig } from './config/queue.config';
@Module({
imports: [queueConfig],
})
export class AppModule {}
Register BullMQ queues and Flow Producer with module
In this demo, I registered 4 BullMQ queues and 1 flow producer in a custom queue module. BullModule.registerQueue({ name })
registers a regular queue named name whereas BullModule.registerFlowProducer
registers a flow producer named flow.
// queue-board.interface.ts
export interface QueueBoardModuleOptions {
queues: string[];
flows?: string[];
}
// queue-board.module-definition.ts
import { ConfigurableModuleBuilder } from '@nestjs/common';
import { QueueBoardModuleOptions } from './queue-board.interface.';
export const { ConfigurableModuleClass,
MODULE_OPTIONS_TOKEN, OPTIONS_TYPE } =
new ConfigurableModuleBuilder<QueueBoardModuleOptions>().build();
// queue-board.module.ts
@Module({})
export class QueueModule extends ConfigurableModuleClass {
static register(options: typeof OPTIONS_TYPE): DynamicModule {
const bullModules = options.queues.map((name) => BullModule.registerQueue({ name }));
const flowProducers = (options.flows || []).map((flow) =>
BullModule.registerFlowProducer({
name: flow,
}),
);
return {
...super.register(options),
imports: [...bullModules, ...flowProducers],
exports: [...bullModules, ...flowProducers],
};
}
}
Register QueueModule in MathModule
QueueModule
registers 4 queues, MATH_BINARY
, MATH_UNARY
, MATH_ARRAY_CHILD
and MATH_ARRAY_MERGE
, and 1 flow producer MATH_ARRAY_PRODUCER
in BullModule
in @nestjs/bullmq
package.
// math.module.ts
@Module({
imports: [
QueueModule.register({
queues: [MATH_BINARY, MATH_UNARY, MATH_ARRAY_CHILD, MATH_ARRAY_MERGE],
flows: [MATH_ARRAY_PRODUCER],
}),
],
providers: [
MathBinaryOperationProcessor,
MathUnaryOperationPocessor,
MathArrayChildProcessor,
MathArrayMergeProcessor,
ArrayFlowService,
],
controllers: [MathController, MathArrayController],
})
export class MathModule {}
Import MathModule into AppModule
// app.module.ts
import { Module } from '@nestjs/common';
import { queueConfig } from './config/queue.config';
import { MathModule } from './math/math.module';
@Module({
imports: [queueConfig, MathModule],
})
export class AppModule {}
Queuing jobs in NestJS and BullMQ
The codes of queuing jobs in NestJS is the same regardless nestjs/bull
or nestjs/bullmq
is used. Use InjectQueue
decorator to inject a queue and add a job with data. The new job has a job id that the controller method can return to client. Client can use that job id to check status and obtain job value.
// inject-queue.decorator.ts
import { InjectQueue } from '@nestjs/bullmq';
import { MATH_BINARY, MATH_UNARY } from '../constants/math.constant';
export const InjectMathBinaryQueue = () => InjectQueue(MATH_BINARY);
export const InjectMathUnaryQueue = () => InjectQueue(MATH_UNARY);
// math.controller.ts
@Controller('math')
export class MathController {
constructor(@InjectMathBinaryQueue() private mathBinaryQueue: Queue) {}
@Post('sum')
async sum(@Body() dto: BinaryOperationDto): Promise<string> {
const job = await this.mathBinaryQueue.add(MATH_BINARY_OPS.SUM, dto);
return job.id || '';
}
}
Create BullMQ processor
Bullmq does not have @Process()
decorator; therefore, it handles named job differently in processor class. In Bullmq processor, the process
method uses a switch construct to compare job name and invoke the corresponding job function.
// worker-host.process.ts
import { OnWorkerEvent, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
export abstract class WorkerHostProcessor extends WorkerHost {
protected readonly logger = new Logger(WorkerHostProcessor.name);
@OnWorkerEvent('completed')
onCompleted(job: Job) {
const { id, name, queueName, finishedOn, returnvalue } = job;
const completionTime = finishedOn ? new Date(finishedOn).toISOString() : '';
this.logger.log(
`Job id: ${id}, name: ${name} completed in queue ${queueName} on ${completionTime}. Result: ${returnvalue}`,
);
}
@OnWorkerEvent('progress')
onProgress(job: Job) {
const { id, name, progress } = job;
this.logger.log(`Job id: ${id}, name: ${name} completes ${progress}%`);
}
@OnWorkerEvent('failed')
onFailed(job: Job) {
const { id, name, queueName, failedReason } = job;
this.logger.error(`Job id: ${id}, name: ${name} failed in queue ${queueName}. Failed reason: ${failedReason}`);
}
@OnWorkerEvent('active')
onActive(job: Job) {
const { id, name, queueName, timestamp } = job;
const startTime = timestamp ? new Date(timestamp).toISOString() : '';
this.logger.log(`Job id: ${id}, name: ${name} starts in queue ${queueName} on ${startTime}.`);
}
}
WorkerHostProcessor
extends WorkerHost
and overrides @OnWorkEvent('completed')
, @OnWorkerEvent('progress')
and @OnWorkerEvent('active')
to log the completion, progress and activation of a job. Other processors can extend WorkerHostProcessor
to avoid writing boilerplate @OnWorkerEvent
.
// binary-operation.dto
import { IsNumber } from 'class-validator';
export class BinaryOperationDto {
@IsNumber()
num: number;
@IsNumber()
num2: number;
}
// math-binary-ops.enum.ts
export enum MATH_BINARY_OPS {
SUM = 'SUM',
SUBTRACT = 'SUBTRACT',
MULTIPLY = 'MULTIPLY',
DIVISION = 'DIVISION',
}
// math-binary-operation.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { MATH_BINARY } from '../constants/math.constant';
import { WorkerHostProcessor } from './worker-host.processor';
@Processor(MATH_BINARY)
@Injectable()
export class MathBinaryOperationProcessor extends WorkerHostProcessor {
process(job: Job<BinaryOperationDto, number, string>): Promise<number> {
const { num, num2 } = job.data;
switch (job.name) {
case MATH_BINARY_OPS.SUM:
return Promise.resolve(num + num2);
}
throw new BadRequestException(`Unknown job name: ${job.name}`);
}
}
@Processor(MATH_BINARY)
ensures that the processor takes jobs from MATH_BINARY queue to complete. In process method, the switch block matches job name to MATH_BINARY_OPS.SUM
and returns the sum of 2 number inputs.
Then, import MathBinaryOperationProcessor
in the providers array of MathModule
// math.module.ts
@Module({
...
providers: [
MathBinaryOperationProcessor,
],
controllers: [MathController],
})
export class MathModule {}
Test BullMq
I made a HTTP request to sum 2 numbers together and the response is the job id
curl --location 'http://localhost:3000/math/sum' \
--header 'Content-Type: application/json' \
--data '{
"num": 4,
"num2": 3
}'
5
In the terminal, the job completed and logged the answer 7.
[Nest] 33415 - 12/17/2023, 10:19:15 PM LOG [WorkerHostProcessor] Job id: 5, name: SUM starts in queue math-binary on 2023-12-17T14:19:15.168Z.
[Nest] 33415 - 12/17/2023, 10:19:15 PM LOG [WorkerHostProcessor] Job id: 5, name: SUM completed in queue math-binary on 2023-12-17T14:19:15.736Z. Result: 7
Flow Producer
This is a new feature of BullMQ. The idea of flow producer is to split a heavy job into one or more smaller jobs to process. When all the smaller jobs finish, the results are available to the parent job to combine them into the final result
Register a flow producer
BullModule.registerFlowProducer({ name: flow, })
Inject flow producer
In createFlow
method, I split a number array into sub-arrays. Then, I created a child job for each sub-array to run. When these children jobs finish, the parent job in the parent queue, MATH_ARRAY_MERGE
, executes to produce the final result.
// array-flow.service.ts
const PARTITION_SIZE = 4;
@Injectable()
export class ArrayFlowService {
constructor(@InjectMathArrayProducer() private mathFlowProducer: FlowProducer) {}
async createFlow(dto: ArrayOperationDto, jobName: MATH_ARRAY_OPS): Promise<string> {
const flow = await this.mathFlowProducer.add({
name: jobName,
queueName: MATH_ARRAY_MERGE,
children: this.createChildrenJobs(dto, jobName),
});
return flow.job.id || '';
}
private createChildrenJobs(dto: ArrayOperationDto, jobName: MATH_ARRAY_OPS) {
const numPartitions = Math.ceil(dto.data.length / PARTITION_SIZE);
let startIdx = 0;
const children: FlowChildJob[] = [];
for (let i = 0; i < numPartitions - 1; i++) {
children.push({
name: jobName,
data: {
data: dto.data.slice(startIdx, startIdx + PARTITION_SIZE),
percentage: (100 / numPartitions) * (i + 1),
},
queueName: MATH_ARRAY_CHILD,
});
startIdx = startIdx + PARTITION_SIZE;
}
children.push({
name: jobName,
data: { data: dto.data.slice(startIdx), percentage: 100 },
queueName: MATH_ARRAY_CHILD,
});
return children;
}
}
Child processor
The child processor takes jobs from the MATH_ARRAY_CHILD
queue to process. When job name is MATH_ARRAY_OPS.MIN
, it returns the minimum element of an arbitrary array. When job name is MATH_ARRAY_OPS.MAX
, it returns the maximum element of an array. MATH_ARRAY_OPS.FILTER_ODD
job filters odd numbers and MATH_ARRAY_OPS.FILTER_EVEN
job filters even numbers.
// math-array-child.processor.ts
@Processor(MATH_ARRAY_CHILD)
@Injectable()
export class MathArrayChildProcessor extends WorkerHostProcessor {
async process(job: Job<ComparisonJobProgress, number | number[], string>): Promise<number | number[]> {
switch (job.name) {
case MATH_ARRAY_OPS.MIN:
return Math.min(...job.data.data);
case MATH_ARRAY_OPS.MAX:
const maxResult = Math.max(...job.data.data);
case MATH_ARRAY_OPS.FILTER_ODD:
return job.data.data.filter((n) => n % 2 === 1);
case MATH_ARRAY_OPS.FILTER_EVEN:
return job.data.data.filter((n) => n % 2 === 0);
}
throw new BadRequestException(`Unknown job name ${job.name} found in queue ${job.queueName}`);
}
}
Producer processor
When all children jobs complete successfully, parent job in the MATH_ARRAY_MERGE queue
receives the children values through Object.values(await job.getChildrenValues())
. Then, the parent job invokes more functions on the children values to yield the final result.
// math-array-merge.process.ts
@Processor(MATH_ARRAY_MERGE)
@Injectable()
export class MathArrayMergeProcessor extends WorkerHostProcessor {
async process(job: Job<ArrayOperationDto, number | number[], string>): Promise<number | number[]> {
const results = Object.values(await job.getChildrenValues());
switch (job.name) {
case MATH_ARRAY_OPS.MIN:
return Math.min(...results);
case MATH_ARRAY_OPS.MAX:
return Math.max(...results);
case MATH_ARRAY_OPS.FILTER_ODD:
case MATH_ARRAY_OPS.FILTER_EVEN:
return (results as number[][]).flat();
}
throw new BadRequestException(`Unknown job name ${job.name}`);
}
}
Controller
When user makes a request to find the minimum/maximum number in an array, a new flow is appended to the flow producer
// math-array.controller.ts
@Controller('math-array')
export class MathArrayController {
constructor(private arrayFlowService: ArrayFlowService) {}
@Post('min')
async findMin(@Body() dto: ArrayOperationDto): Promise<string> {
return this.arrayFlowService.createFlow(dto, MATH_ARRAY_OPS.MIN);
}
@Post('max')
async findMax(@Body() dto: ArrayOperationDto): Promise<string> {
return this.arrayFlowService.createFlow(dto, MATH_ARRAY_OPS.MAX);
}
}
Test Flow Producer
I made a HTTP request to find the max element of a number array
curl --location 'http://localhost:3000/math-array/max' \
--header 'Content-Type: application/json' \
--data '{
"data": [1,2,5,-3, 90, 77, -900, 700, 300, 999, -1000, 1099, -2000]
}'
The response is a UUID job id
fecd7af4-31b0-4add-a716-4dd595794332
The flow producer splits the number array into 4 children jobs.
Children job 1 processes [1,2,5,-3]
Children job 2 processes [90, 77, -900, 700]
Children job 3 processes [300, 999, -1000, 1099]
Children job 4 processes [-2000]
Children jobs return 5, 700, 1099 and -2000 to the parent job respectively. The parent job invokes Math.max on [5, 700, 1099, -2000] and the final value is 1099.
[Nest] 36584 - 12/17/2023, 11:45:39 PM LOG [WorkerHostProcessor] Job id: a1625cf4-15c8-4a9a-bc6f-a89a49f085a2, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.606Z. Result: 5
[Nest] 36584 - 12/17/2023, 11:45:39 PM LOG [WorkerHostProcessor] Job id: ed5c95aa-1854-4aa0-92cd-13b64ddf3dd1, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.613Z. Result: 700
[Nest] 36584 - 12/17/2023, 11:45:39 PM LOG [WorkerHostProcessor] Job id: f52ed2b2-ee11-46a5-963a-d15023564ed8, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.626Z. Result: 1099
[Nest] 36584 - 12/17/2023, 11:45:39 PM LOG [WorkerHostProcessor] Job id: 92aa6a17-c6f4-4a07-87d1-7b19054fbc51, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.647Z. Result: -2000
[Nest] 36584 - 12/17/2023, 11:45:39 PM LOG [WorkerHostProcessor] Job id: fecd7af4-31b0-4add-a716-4dd595794332, name: MAX completed in queue math-array-merge on 2023-12-17T15:45:39.872Z. Result: 1099
The log in the terminal also describes the same steps to land the final value, 1099.
This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.
Resources:
- Github Repo: https://github.com/railsstudent/nestjs-bullmq-demo
- BullMq: https://docs.bullmq.io/
- Named processor in BullMq: https://docs.bullmq.io/patterns/named-processor
Top comments (3)
Connie Leung, registered an account first time just to say huge thank you for sharing. The @nestjs/bullmq package is so non-intuitive to use, from outdated documentation, to lack of practical examples. I've struggled so much before found your article ✨
Thanks. My company uses it and I wrote down what my teams did.
Thanks for sharing ⭐