DEV Community

Cristiano Pacheco for run_as_root GmbH

Posted on

Preventing Transaction Loss: Unleashing the Power of Resilient Transactions with RabbitMQ dead-letter Exchanges in Magento 2

This post will guide Magento 2 developers through implementing a message queue retry mechanism using the RabbitMQ dead-letter exchange feature.

To assist us in this endeavor, we will leverage the Message Queue Retry module by run-as-root.

Table of Contents

What is RabbitMQ?

RabbitMQ is a widely embraced open-source Message Queue software, facilitating seamless interaction and scalability across numerous applications. As a proficient messaging broker, RabbitMQ furnishes a unified platform catering to Asynchronous messaging needs. Within this platform, applications gain the capability to transmit and receive messages securely, with RabbitMQ diligently ensuring the sanctuary of your messages until they are consumed.

Since version 2.1.0, Magento 2 has offered native RabbitMQ support, enabling effortless queue configuration through XML settings. For further details, you can explore additional information here

What is a dead-letter exchange?

This feature functions as a safety net within the messaging architecture, intended to manage messages that encounter unsuccessful processing by the consumer.

The dead-letter exchange operates as a mechanism for directing these problematic messages to a designated queue referred to as a dead-letter queue. This facilitates developers in isolating and inspecting troublesome messages without interrupting the primary processing flow, enabling them to analyze these messages, identify the underlying reasons for processing failures, and implement essential adjustments to ensure smooth operation.

It's particularly recommended for scenarios where upholding message integrity, diagnosing processing errors, and averting message loss is paramount.

Examples of transaction loss scenarios:

  • Exporting the customers' data to a CRM

    • Possible issue: The CRM credentials were modified, but the changes were not reflected in the Magento integration settings.
    • Consequence: This mismatch in credentials will result in failed API calls due to the usage of incorrect or invalid authentication information.
  • Exporting the orders to an ERP

    • Possible issue: During a marketing campaign, the store's throughput surged by 200%, leading to a substantial rise in incoming orders; however, the ERP system could only accommodate its standard order import capacity.
    • Consequence: Only a fraction of the increased order influx will be successfully exported to the ERP system, potentially causing delays and incomplete order processing.
  • Scheduled price uplift

    Imagine having the capability to schedule a comprehensive price uplift by 10% for the entire catalog of your store.

    • Possible issue: Deadlock encountered while saving product and price information.
    • Consequence: The product prices may not be accurate.

Practical case scenario

A typical case in e-commerce is exporting orders to an ERP or OMS. We will implement an asynchronous order export architecture, where for each placed order, a message is created with the order id and sent to a queue. So that a consumer can later read this message and export it to the external system.

To simplify this example, let's declare the topic's name, exchange, queue, and bindings with the same name: erp_order_export.

To handle this messaging, we'll utilize RabbitMQ as the broker.

You can find the complete code for the module on this repository.

We have to create four files to declare our queue.

etc/communication.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="erp_order_export" request="CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface"/>
</config>
Enter fullscreen mode Exit fullscreen mode

etc/queue_consumer.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="erp_order_export" queue="erp_order_export" connection="amqp"
              handler="CristianoPacheco\OrderExport\Queue\Consumer\ErpOrderExportConsumer::execute"
    />
</config>
Enter fullscreen mode Exit fullscreen mode

etc/queue_publisher.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="erp_order_export">
        <connection name="amqp" exchange="erp_order_export"/>
    </publisher>
</config>
Enter fullscreen mode Exit fullscreen mode

etc/queue_topology.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="erp_order_export" connection="amqp" type="topic">
        <binding id="erp_order_export" topic="erp_order_export" destinationType="queue" destination="erp_order_export"/>
    </exchange>
</config>
Enter fullscreen mode Exit fullscreen mode

Run the command below in order to Magento send the command to create the exchange, queue and bindings on RabbitMQ:

bin/magento setup:upgrade
Enter fullscreen mode Exit fullscreen mode

It will look like this on RabbitMQ admin:

exchange

Queue

Visually, our configs will generate something like this:

queue

Note: It would make more sense to configure a direct exchange. However, up to the current version of Magento (2.4.6), only topic exchanges are supported.

Now that the queue is set up, we must send the order ID to the erp_order_export queue every time a successful order is placed.

We can easily do this by listening to the event sales_model_service_quote_submit_success via an observer:

etc/events.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:Event/etc/events.xsd">
    <event name="sales_model_service_quote_submit_success">
        <observer name="erp_export_order" instance="CristianoPacheco\OrderExport\Observer\SendOrderToQueueObserver"/>
    </event>
</config>
Enter fullscreen mode Exit fullscreen mode

I won't post here the Observer code or the entire implementation. The complete working code is in the repository. If you want to simulate the scenarios, please install the module.

Our consumer will use a service to perform our "fake" order export. To simulate an error, our service will always throw an exception:

<?php

declare(strict_types=1);

namespace CristianoPacheco\OrderExport\Service;

use CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface;
use Exception;

class ExportOrderToErpService
{
    public function execute(OrderExportDataInterface $orderExportData): void
    {
        $orderId = $orderExportData->getOrderId();
        throw new Exception("The order $orderId id is invalid");
    }
}
Enter fullscreen mode Exit fullscreen mode

With the current configuration, orders will enter the erp_order_export queue and can be "exported" during the execution of our consumer.

To run our consumer and export only one order per execution, run the following command:

bin/magento queue:consumer:start erp_order_export --single-thread --max-messages=1
Enter fullscreen mode Exit fullscreen mode

What will happen at the end of the execution?

The service will throw an exception, and it will be handled here:

vendor/magento/framework-message-queue/Consumer.php:261

exception-handle

As you can see, the message will be rejected, and the requeue flag will have false as the value. In other words, the message will be removed from the queue, and the order will not be exported to the ERP.

Well done! We managed to simulate a transaction loss.

In day-to-day scenarios, probably (I hope so), exceptions are handled at the service or the consumer's execution entry point, and the message would be sent back to the original queue or even taken manually in cases of error, such as sending an alert, or persisting the message in a database, etc.

What if there was a way to automatically identify without manual work when there are errors in message processing, allowing the possibility to optimize and standardize this process?

Yes, there is. We can delegate this hard work to RabbitMQ via a dead-letter exchange.

Crafting an Effective Retry Strategy for Queue Messages

So, how do we ensure that our messages reach their destination in the face of these intermittent failures? This is where the concept of a retry strategy comes into play. A retry strategy outlines how we handle failed operations by making subsequent attempts at sending the same message. When exporting an order to ERP, adopting a well-defined retry strategy can significantly enhance the reliability of our application.

For this specific case, three attempts are reasonable.

So let's define our requirements, it should be possible to:

  • Reprocess the messages
  • Set processing retry limit
  • Set an interval in minutes for each retry attempt
  • Find out what error was generated during message processing
  • See what the message payload is
  • If the limit of retries is reached, we must provide a way to manage message processing manually
    • Resend the message to the queue
    • Discard the message

Steps to achieve our goal:

  1. We will add a second queue (erp_order_export_delay) in RabbitMQ to store the message for a specific lifetime. This queue will not have a consumer. It will only serve to "delay" processing the message in the source queue (erp_order_export).

  2. When the time to live is reached, RabbitMQ will automatically send the message to the source queue (erp_order_export).

  3. If the retry limit is reached, the message will be stored in a database table for manual management.

    • Maybe the ERP server is down, or some order data is incorrect, so we have to reprocess the messages later.

Steps 1 and 2 can be configured in the queue declaration, and RabbitMQ natively supports it.

Step 3 is a capability provided by the run as root Message Queue Retry module.

Practical case scenario with queue retry

Firstly we need to install the message queue retry module and enable the behavior to persist the messages on the database after the retry limit is reached.

No we have to configure our dead-letter exchange. The complete implementation of code below can be found here.

We'll use the exp_order_export_delay as the name for the topic, exchange, queue and bindings.

Declaring the topic:

etc/communication.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="erp_order_export" request="CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface"/>
    <topic name="erp_order_export_delay" request="CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface"/>
</config>
Enter fullscreen mode Exit fullscreen mode

Declaring the retry limit:

etc/queue_retry.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:RunAsRoot:module:RunAsRoot_MessageQueueRetry:/etc/queue_retry.xsd">
    <topic name="erp_order_export" retryLimit="3"/>
</config>
Enter fullscreen mode Exit fullscreen mode

Declaring the dead-letter exchange:

etc/queue_topology.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="erp_order_export" connection="amqp" type="topic">
        <binding id="erp_order_export" topic="erp_order_export" destinationType="queue" destination="erp_order_export">
            <arguments>
                <argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export_delay</argument>
                <argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export_delay</argument>
            </arguments>
        </binding>
    </exchange>
    <exchange name="erp_order_export_delay" connection="amqp" type="topic">
        <binding id="erp_order_export_delay" topic="erp_order_export_delay" destinationType="queue"
                 destination="erp_order_export_delay">
            <arguments>
                <argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export</argument>
                <argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export</argument>
                <argument name="x-message-ttl" xsi:type="number">60000</argument><!-- 1 minute in  milliseconds -->
            </arguments>
        </binding>
    </exchange>
</config>
Enter fullscreen mode Exit fullscreen mode

Let's take a closer look at each change in the file above.

1- the arguments x-dead-letter-exchange and x-dead-letter-routing-key will tell RabbitMQ to automatically send the messages to the erp_order_export_delay exchange in case the message is negatively acknowledged (rejected or nack with requeue parameter set to false).

...
<arguments>
    <argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export_delay</argument>
    <argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export_delay</argument>
</arguments>
...
Enter fullscreen mode Exit fullscreen mode

2- We declared a the erp_order_export_delay exchange and bound it to the erp_order_export_delay queue

...
<exchange name="erp_order_export_delay" connection="amqp" type="topic">
    <binding id="erp_order_export_delay" topic="erp_order_export_delay" destinationType="queue" destination="erp_order_export_delay">
...
Enter fullscreen mode Exit fullscreen mode

3- The arguments x-dead-letter-exchange and x-dead-letter-routing-key defines the return path for the message and the x-message-ttl has an integer number in milliseconds that defines when the message will return to the exchanged defined in the previous two arguments.

Remembering that this queue does not have a consumer.

...
<arguments>
   <argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export</argument>
   <argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export</argument>
   <argument name="x-message-ttl" xsi:type="number">60000</argument><!-- 1 minute in  milliseconds -->
</arguments>
...
Enter fullscreen mode Exit fullscreen mode

Visually, our new configs will generate something like this:

queue-after-config

Note: We always send the messages to an exchange and the exchange will forward the messages to one or more queues. It is the same with dead-letter exchanges.

Now we have to manually delete the erp_order_export exchange and queue in RabbitMQ, because it won't be updated. You can do it in the RabbitMQ admin panel or via CLI.

After you did it, you have to run the command below to reflect our changes in RabbitMQ:

bin/magento setup:upgrade
Enter fullscreen mode Exit fullscreen mode

Now if we navigate again in the RabbitMQ panel we should see:

Exchanges:

dead-letter-exchanges

If we open our exchanges an expand the bindings, is it possible to see the arguments we have declared before:

exchange-1

exchange-2

Queues:

Queues

Did you notice that queues now have a few more features?

queue-features

A homework for you, hover the mouse over each feature and wait a few seconds, a caption will be displayed informing what it is about. Also access each queue and expand the bindings section, see the values contained there.

Now that we finished our RabbitMQ configuration, let's place a new order and start the erp_order_export consumer to see what will occur.

After place an order a new massage is in the erp_order_export queue:

first-execution

Now let's start our consumer:

bin/magento queue:consumer:start erp_order_export --single-thread --max-messages=1
Enter fullscreen mode Exit fullscreen mode

Now if we take a look at the queues we'll see:

second-execution

During message consumption, our service threw an exception, simulating a problem and the message was rejected. RabbitMQ sent the message to the erp_order_expor_delay queue.

After one minute the message was automatically sent to the erp_order_export queue.

Let's inspect the message headers:

first-rejection

The x-death header has interesting information about the retries.

Relevant attribute for us to analyse are:

  • count - it is the retry number
  • reason - In our case, can be rejected or expired
  • time - The timestamp of the occurrence
  • exchange, queue, routing-keys - Where the message was at the time of occurrence

So, what that image says is:

Reading from bottom to top

First occurrence

The message was in the erp_order_export queue and while processing it, it was rejected on Saturday, August 26, 2023 9:12:33 PM GMT-03:00

Second occurrence

The message was in the erp_order_export_delay queue, and it was rejected on Saturday, August 26, 2023 9:12:53 PM GMT-03:00

It has 20 seconds difference because I configured the TTL for 20 seconds :)

This is the message headers after the second retry:

second-execution

After the third retry:

third-execution

What will occur in the next consumer execution?

fourth-execution

So, the run-as-root module intercepted the message processing failure, verified that the message has reached the retry limit and then persisted the message in the Magento database.

Now it is possible to manage the message through the admin panel, in the path:

Run as Root > Manage Messages

magento-admin

Now to can decide if you'll discard the message or send it back to the erp_order_export queue.

In summary, while multiple approaches exist to tackle this issue, my aim has been to show a straightforward method for to implement a retry mechanism.

That's it for today, happy coding!

Top comments (4)

Collapse
 
riconeitzel profile image
Rico Neitzel

wow … that's a very informative article with a lot of useful details 👏🏻👏🏻👏🏻

Collapse
 
cristiano-pacheco profile image
Cristiano Pacheco

Thank you Rico. Coming from you is an honor!

Collapse
 
aquarvin profile image
Aleksandr Ishchenko

Very useful module, thank you. However, I think names for topic, exchange, queue binding ID etc should be unique rather than one universal erp_order_export.
Got so confused when tried to configure my topics, exchanges queues and consumers

Collapse
 
aquarvin profile image
Aleksandr Ishchenko

So if I don't run cli command
bin/magento queue:consumer:start erp_order_export --single-thread --max-messages=1 , then consumer never starts?
Something is missing here.