Redis Streams is a feature that allows you to model data as a log or a stream of events. You can use it for various purposes, including event-driven architecture, message queuing, and real-time data processing. Here's an example of how you can use Redis Streams to implement a simple message queue:
const redis = require('redis');
const { promisify } = require('util');
const redisClient = redis.createClient();
const xaddAsync = promisify(redisClient.xadd).bind(redisClient);
const xreadgroupAsync = promisify(redisClient.xreadgroup).bind(redisClient);
const STREAM_NAME = 'message_stream';
const CONSUMER_GROUP_NAME = 'message_consumers';
// Create a stream (if it doesn't exist)
async function createStreamIfNotExists() {
try {
await redisClient.xgroup('CREATE', STREAM_NAME, CONSUMER_GROUP_NAME, '$', 'MKSTREAM');
} catch (err) {
// Ignore if the stream already exists
if (!err.message.includes('BUSYGROUP Consumer Group name already exists')) {
throw err;
}
}
}
// Produce a message to the stream
async function produceMessage(message) {
const messageId = await xaddAsync(STREAM_NAME, '*', 'message', message);
console.log(`Produced message with ID: ${messageId}`);
}
// Consume messages from the stream
async function consumeMessages() {
const consumerName = 'consumer-1';
while (true) {
try {
const messages = await xreadgroupAsync(
'GROUP',
CONSUMER_GROUP_NAME,
consumerName,
'BLOCK',
0,
'COUNT',
10,
'STREAMS',
STREAM_NAME,
'>',
);
for (const [stream, messageData] of messages) {
for (const [messageId, message] of messageData) {
console.log(`Received message with ID ${messageId}: ${message}`);
// Process the message here
// Acknowledge the message to remove it from the stream
await redisClient.xack(STREAM_NAME, CONSUMER_GROUP_NAME, messageId);
}
}
} catch (err) {
console.error('Error consuming messages:', err);
}
}
}
(async () => {
await createStreamIfNotExists();
// Start consuming messages
consumeMessages();
// Produce some example messages
for (let i = 1; i <= 10; i++) {
await produceMessage(`Message ${i}`);
await new Promise((resolve) => setTimeout(resolve, 1000)); // Delay between messages
}
})();
In this example:
We create a Redis stream named
message_stream
and a consumer group namedmessage_consumers
.The
produceMessage
function is used to produce messages to the stream, and theconsumeMessages
function consumes messages from the stream.Messages are produced to the stream with unique IDs, and consumers can read and acknowledge (ack) them. Once a message is acknowledged, it's considered processed and removed from the stream.
The consumer script runs continuously, waiting for new messages to arrive in the stream. It processes each message and acknowledges it.
Redis Streams is a versatile feature that can be used for more complex use cases beyond simple message queues, including real-time event processing and log aggregation. You can customize and extend this example to fit your specific requirements.
Top comments (0)