While BullMQ is a powerful message queue which also can be implemented in Nestjs easily, it does not support tls connection or redis cluster very well.
I came across to small troubles when I was trying to use AWS MemoryDB with BullMQ. However, Using AWS MemoryDB instead of local redis server has two big difference.
1) uses tls
2) configured as a cluster
I'll go through these issues step by step. For those who aren't familiar with BullMQ in Nestjs, you can read BullMQ's Nestjs guide which is already pretty good.
TLS Connection
First big problem is that BullMQ does not provide a way to use tls connection, or a connection uri starting with rediss://
.
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
tls: {
...
},
},
prefix: 'bull',
})
host
field only accepts the host part of the uri, and doesn't want to know about protocol or other query string options. You can still provide tls
object, but you need certificates which is not quite necessary for connecting MemoryDB.
Luckily, connection
field also accepts IORedis connection object, and IORedis connection object does provide a way to use connection string.
So, install ioredis
and create a connection object.
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => {
const uri = config.get<string>('redisUri');
const connection = new IORedis(uri, {
// you'll get some warnings if not set to null
maxRetriesPerRequest: null,
});
return {
connection,
prefix: 'bull',
};
},
inject: [ConfigService],
}),
Also, you can do the same with listeners.
async addQueue(data: YourData): Promise<JobResult> {
const job = await this.queue.add(JOB_NAME, data, options);
const listener = this.getListener();
return job.waitUntilFinished(listener);
}
private getListener() {
const uri = this.env.get<string>('redisUri');
const connection = new IORedis(uri, { maxRetriesPerRequest: null });
return new QueueEvents(QUEUE_NAME, { connection });
}
Hash Slots
Now we made a connection to AWS MemoryDB. But then, it is very likely to get an error saying,
Worker error ReplyError: CROSSSLOT Keys in request don't hash to the same slot
To understand what this error means, you first have to know how Redis Cluster stores many keys across its nodes. Redis Cluster implements a new identifier called slots
. Each key is given a slot number, and this slot number is used as an index to find which key-value is in which node.
The problem is, redis transaction is only possible between the keys with same slots. Meaning you cannot make a transaction with key3
and key4
from the image above, although they are in the same node. And this slot numbers are assigned by some hash algorithm based on the key. The resulted slots have maximum number up to 16384, still different keys are likely to have different slots assigned.
One way to control these assigned slots is to use hash tags
. Hash tags is a part of key covered in brackets{}
. For example, user1:session
and user1:profile
is not hash-tagged, so they are likely to get different hash slots. But, {user1}:session
and {user1}:profile
have hash tag string - user1
, and this string is only hashed to get the slot. As a result, two keys are stored under same slot.
If you have a single queue within the same BullMQ connection, then hash tagging prefix like the BullMQ docs is good enough.
return {
connection,
prefix: '{bull}',
};
But if you use several queues with the same Redis connection, you can also hash tag queue names. In this case, I recommend creating a constant
as queue name is quite frequently used across the code.
// const.ts
export const QueueConfig = {
QUEUE_NAME: '{prompt}',
} as const;
// prompt.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: QueueConfig.QUEUE_NAME,
defaultJobOptions: { ... },
}),
],
...
})
export class PromptModule {}
// prompt.queue.ts
@Injectable()
@QueueEventsListener(QueueConfig.QUEUE_NAME)
export class PromptQueue extends QueueEventsHost {
constructor(
@InjectQueue(QueueConfig.QUEUE_NAME) private queue: Queue<PromptImageForm>,
) {
super();
}
async add() { ... }
}
// prompt.processor.ts
@Processor(QueueConfig.QUEUE_NAME, { ... })
export class PromptProcessor extends WorkerHost { ... }
Top comments (0)