Let's assume such a situation - after performing some action, we sent a message to the queue. During its processing the consumer makes a request to an external API. Surprisingly, the API responds with a 5xx code. In that situation it would be good to repeat a request after a while to make sure the error on the API provider's side does not occur anymore and we can complete the request. How to do it?
In RabbitMQ we can delay messages consumption. There are two ways of doing it. First one requires installation of an additional plugin, which is available since version 3.5.3 of RabbitMQ. Second one is a kind of workaround and doesn't require any additional tools.
RabbitMQ Delayed Message Plugin
The plugin adds a new exchange type: x-delayed-message
. We send messages to it with x-delay
header with value in milliseconds - after this time the message will appear in the queue.
When declaring an exchange, we pass x-delayed-type
header with the type of the original exchange, e.g. direct or topic. This argument is required.
Messages, after being sent, are stored in a special database called Mnesia. They are stored on a single disk replica of the current node. RAM nodes are not supported in this case.
Pros:
- a dedicated exchange supporting message delay
- potentially suitable for production use as long as the user is aware of its limitations
Cons:
- plugin is still considered as experimental
- it is required to install it
- works only with newer RabbitMQ versions
- a bit slower than the native solutions
Example:
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare(
queue: 'example_queue',
durable: true,
auto_delete: false
);
$channel->exchange_declare(
exchange: 'example_exchange',
type: 'x-delayed-message',
durable: true,
auto_delete: false,
arguments: new AMQPTable(
[
'x-delayed-type' => 'direct'
]
)
);
$channel->queue_bind('example_queue', 'example_exchange');
$message = new AMQPMessage('test', [
'application_headers' => new AMQPTable(['x-delay' => 10000])
]);
$channel->basic_publish($message, 'example_exchange');
Dead Letter Queue
This solution is available in every version of RabbitMQ. It's about to create two queues.
The first one is created with three parameters:
- x-dead-letter-exchange
- x-dead-letter-routing-key
- x-message-ttl
We publish messages to this queue, but we don't consume them from it.
Parameter x-message-ttl
defines for how long the message will be kept on that queue in milliseconds. After that time the message is being moved to the exchange declared in x-dead-letter-exchange
argument, with routing declared in x-dead-letter-routing-key
.
As a result, all messages terminated on the first queue will be passed to the second one.
Pros:
- does not require installing additional plugins
- native solution
- works also on older RabbitMQ versions
Cons:
- requires creating duplicated queues
- changing ttl requires recreating a queue
Example:
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare(
queue: 'destination_queue',
durable: true,
auto_delete: false
);
$channel->queue_declare(
queue: 'delay_queue',
durable: true,
auto_delete: false,
arguments: new AMQPTable(
[
'x-dead-letter-exchange' => 'delayed_exchange',
'x-dead-letter-routing-key' => 'destination_queue',
'x-message-ttl' => 10000
]
)
);
$channel->exchange_declare(
exchange: 'delayed_exchange',
type: AMQPExchangeType::DIRECT,
durable: true,
auto_delete: false
);
$channel->queue_bind('delay_queue', 'delayed_exchange', 'delay_queue');
$channel->queue_bind('destination_queue', 'delayed_exchange', 'destination_queue');
$message = new AMQPMessage('test');
$channel->basic_publish($message, 'delayed_exchange', 'delay_queue');
Summary
Both solutions are easy to use. They have their advantages and disadvantages. By choosing the first solution, we get dedicated exchange. However, we would have to install additional plugin, which is actually fairly stable for production use.
Second solution is native. However, it requires creating additional queues and therefore is not as transparent as the first one. But it is faster.
Top comments (0)