DEV Community

Jarosław Szutkowski
Jarosław Szutkowski

Posted on • Updated on

How to Delay RabbitMQ Message Consumption in Just a Few Easy Steps

Let's assume such a situation - after performing some action, we sent a message to the queue. During its processing the consumer makes a request to an external API. Surprisingly, the API responds with a 5xx code. In that situation it would be good to repeat a request after a while to make sure the error on the API provider's side does not occur anymore and we can complete the request. How to do it?

In RabbitMQ we can delay messages consumption. There are two ways of doing it. First one requires installation of an additional plugin, which is available since version 3.5.3 of RabbitMQ. Second one is a kind of workaround and doesn't require any additional tools.

RabbitMQ Delayed Message Plugin

The plugin adds a new exchange type: x-delayed-message. We send messages to it with x-delay header with value in milliseconds - after this time the message will appear in the queue.

When declaring an exchange, we pass x-delayed-type header with the type of the original exchange, e.g. direct or topic. This argument is required.

Messages, after being sent, are stored in a special database called Mnesia. They are stored on a single disk replica of the current node. RAM nodes are not supported in this case.

Pros:

  • a dedicated exchange supporting message delay
  • potentially suitable for production use as long as the user is aware of its limitations

Cons:

  • plugin is still considered as experimental
  • it is required to install it
  • works only with newer RabbitMQ versions
  • a bit slower than the native solutions

Example:

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->queue_declare(
    queue: 'example_queue',
    durable: true,
    auto_delete: false
);

$channel->exchange_declare(
    exchange: 'example_exchange',
    type: 'x-delayed-message',
    durable: true,
    auto_delete: false,
    arguments: new AMQPTable(
        [
            'x-delayed-type' => 'direct'
        ]
    )
);

$channel->queue_bind('example_queue', 'example_exchange');

$message = new AMQPMessage('test', [
        'application_headers' => new AMQPTable(['x-delay' => 10000])
    ]);

$channel->basic_publish($message, 'example_exchange');
Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue

This solution is available in every version of RabbitMQ. It's about to create two queues.

The first one is created with three parameters:

  • x-dead-letter-exchange
  • x-dead-letter-routing-key
  • x-message-ttl

We publish messages to this queue, but we don't consume them from it.

Parameter x-message-ttl defines for how long the message will be kept on that queue in milliseconds. After that time the message is being moved to the exchange declared in x-dead-letter-exchange argument, with routing declared in x-dead-letter-routing-key.

As a result, all messages terminated on the first queue will be passed to the second one.

Pros:

  • does not require installing additional plugins
  • native solution
  • works also on older RabbitMQ versions

Cons:

  • requires creating duplicated queues
  • changing ttl requires recreating a queue

Example:

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->queue_declare(
    queue: 'destination_queue',
    durable: true,
    auto_delete: false
);

$channel->queue_declare(
    queue: 'delay_queue',
    durable: true,
    auto_delete: false,
    arguments: new AMQPTable(
        [
            'x-dead-letter-exchange' => 'delayed_exchange',
            'x-dead-letter-routing-key' => 'destination_queue',
            'x-message-ttl' => 10000
        ]
    )
);

$channel->exchange_declare(
    exchange: 'delayed_exchange',
    type: AMQPExchangeType::DIRECT,
    durable: true,
    auto_delete: false
);

$channel->queue_bind('delay_queue', 'delayed_exchange', 'delay_queue');

$channel->queue_bind('destination_queue', 'delayed_exchange', 'destination_queue');

$message = new AMQPMessage('test');

$channel->basic_publish($message, 'delayed_exchange', 'delay_queue');
Enter fullscreen mode Exit fullscreen mode

Summary

Both solutions are easy to use. They have their advantages and disadvantages. By choosing the first solution, we get dedicated exchange. However, we would have to install additional plugin, which is actually fairly stable for production use.

Second solution is native. However, it requires creating additional queues and therefore is not as transparent as the first one. But it is faster.

Top comments (0)