DEV Community

Cover image for Consuming RabbitMQ Queues in Laravel
Matt Inamdar for Health Place

Posted on

Consuming RabbitMQ Queues in Laravel

RabbitMQ is an open-source message broker with support for queues, exchanges and topics.

It's the "most widely deployed" open-source message broker at that (according to their website).

We decided to use RabbitMQ at Health Place in order to enable asynchronous event-based systems to complete tasks, only when the previous task had completed.

Our exact scenario was to get our ingestion pipeline working, which is composed of multiple services, in different languages.

RabbitMQ allows us to have a single service in the ingestion pipeline complete its task, push a message to RabbitMQ, and then have the next downstream service start its task (which may then do something similar, until the data is finally ingested).

Why not just use Laravel's queue system?

This is a good question, and ideally we'd have preferred it if this was possible.

The problem is to do with how messages are stored in the Laravel queue system.

Laravel calls messages "jobs", which are represented as Plain Old PHP Objects (POPO's). These jobs are then serialised into a string and pushed onto the queue driver configured.

This implementation is fine when the publisher and consumer are the same app. This is usually the case when your Laravel app needs to asynchronously perform a task (such as sending an email).

The problem appears when you introduce several apps, as is the case with our ingestion pipeline.

Our primary API, sitting in front of our database, is our Laravel app. Further upstream in our ingestion pipeline, we have a series of Node.js microservices responsible for several things, such as normalising and validating the data.

These microservices and Laravel need a shared way to communicate with one another via asynchronous events. The way to achieve that is by using a message broker (RabbitMQ) and publishing/consuming messages in JSON.

This works great as JSON is widely used as a standard interface format and has support in almost all modern languages.

Why not used webhooks instead of a queue?

Another good question, and the answer is to do with "simplicity".

If we were to go down the webhook approach, it would require the consumer of messages to now host a HTTP endpoint, which must always be up, in order to receive messages. This is troublesome for services that aren't web-facing.

This also means that the publisher needs to invoke this HTTP endpoint as well as introducing retry logic if the endpoint is ever down. And what should happen is all retries have failed?

A queue on the other hand, allows the message to be published and then any listener can consume this message if they're interested. An added benefit is that multiple services can consume the same message, even though the publisher only published it once.

This allows for great decoupling and flexibility.

Implementation

Now for the fun part! Let's take a look at how we implemented this at Health Place.

Install the RabbitMQ package

Start by installing this package which is a Laravel wrapper for the php-amqplib/php-amqplib package.

composer install bschmitt/laravel-amqp
Enter fullscreen mode Exit fullscreen mode

Once installed, make sure to register it as a service provider:

// config/app.php
return [
    //...

    'providers' => [
        //...

        Bschmitt\Amqp\AmqpServiceProvider::class,
    ],
];
Enter fullscreen mode Exit fullscreen mode

And finally to publish the config file:

php artisan vendor:publish --provider Bschmitt\\Amqp\\AmqpServiceProvider
Enter fullscreen mode Exit fullscreen mode

Be sure to check the newly created config/amqp.php file and add your credentials.

Creating the consumer Artisan command

In order to start consuming messages from RabbitMQ, we need to create an infinitely running Artisan command that will continuously check for messages on the exchange.

Here's a working example of such a command:

<?php

declare(strict_types=1);

namespace App\Console\Commands\HealthPlace\RabbitMQ;

use App\Exceptions\InvalidAMQPMessageException;
use App\Jobs\IngestDataJob;
use Bschmitt\Amqp\Amqp;
use Bschmitt\Amqp\Consumer;
use Exception;
use Illuminate\Console\Command;
use Illuminate\Foundation\Bus\DispatchesJobs;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class ConsumeCommand extends Command
{
    use DispatchesJobs;

    protected $signature = 'healthplace:rabbitmq:consume';

    protected $description = 'Runs a AMQP consumer that defers work to the Laravel queue worker';

    public function handle(Amqp $consumer, LoggerInterface $logger): bool
    {
        $logger->info('Listening for messages...');

        $consumer->consume(
            '',
            function (AMQPMessage $message, Consumer $resolver) use ($logger): void {
                $logger->info('Consuming message...');

                try {
                    $payload = json_decode($message->getBody(), true, 512, JSON_THROW_ON_ERROR);
                    $this->validateMessage($payload);
                    $logger->info('Message received', $payload);
                    $this->dispatch(new IngestDataJob($payload['filepath']));
                    $logger->info('Message handled.');
                    $resolver->acknowledge($message);
                } catch (InvalidAMQPMessageException $exception) {
                    $logger->error('Message failed validation.');
                    $resolver->reject($message);
                } catch (Exception $exception) {
                    $logger->error('Message is not valid JSON.');
                    $resolver->reject($message);
                }
            },
            [
                'routing' => ['ingest.pending'],
            ]
        );

        $logger->info('Consumer exited.');

        return true;
    }

    private function validateMessage(array $payload): void
    {
        if (!is_string($payload['filepath'] ?? null)) {
            throw new InvalidAMQPMessageException('The [filepath] property must be a string.');
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

There's a few things going on here so let's break this down:

  1. The payload (the message content) is JSON decoded, with an exception being thrown if the JSON is malformed.
  2. The JSON itself is then validated to ensure the that it contains the fields in the format that we expect (very similar to request validation).
  3. A standard Laravel job is then pushed for the relevant task to be performed asynchronously.
  4. The message is acknowledged to RabbitMQ so it is aware that the consumer has successfully consumed the message.
  5. If the JSON fails to decode or if validation fails, the message is rejected to RabbitMQ.

You may be wondering what the purpose is for publishing a Laravel job from this command, and although it is completely possible to work the message directly in this command, it introduces some problems...

In our case, the ingestion job takes a long time, potentially hours depending on the size of the dataset. If this consume command was to process that, it would mean that we couldn't consume any further messages from RabbitMQ until the ingestion had finished.

What we opted to do instead, was to push a Laravel job from this and have a queue worker actually do the ingestion. We can even scale up our queue workers if we have a bunch of jobs waiting to be worked, all whilst our RabbitMQ consumer is actively listening for further messages.

And that wraps it up!

Get in touch

If you have any questions, comment below, and I'll get back to you.

And if you think you'd like to work somewhere like Health Place, then message me for a chat at matt@healthplace.io.

Top comments (0)