Symfony messenger is a great bundle which allows developers to easily send tasks execution to the background so that the user do not have to wait until they finish. Common tasks we can send to the background are sending an email, processing a payment and, in general, those tasks that can take some time to finish.
In this post we are going to use the events that messenger dispatches while it is handling a message to monitor it.
Requirements
To follow this post we will need the following elements installed:
- A ready symfony project: To create a symfony project you can use the symfony-cli.
- Composer: This is the most commonly used php package manager. Symfony manages its vendors using it. You can install composer from its site.
- Docker and docker compose: We will use docker as a container manager and docker-compose as a tool to configure and start a redis container. If you have not used them so far, refer to the links to install them.
- Symfony messenger: We also need to install symfony messenger using composer. After having composer installed, install messenger using the bellow command:
composer require symfony/messenger
- Redis bundle: We will use redis to hit messenger events. To install redis bundle use the following command:
composer require snc/redis-bundle
In the next section we're going to configure redis bundle to use phpredis library. Install it before continuing
Installing redis server
We are going to install a redis server using a docker-compose file. Create a file named docker-compose.yaml in your project root folder and paste the following content:
version: '3'
services:
redis:
image: redis
ports:
- '6401:6379'
In the above example, we're telling compose to build our docker container using the last redis image and linking the redis container port to the external port 6401.
Now, to start the container, go to your project root folder and use the following command:
docker-compose up -d
The last command will download the redis image and start our container. After doing it we will have a redis server running and listening to the port 6401.
Configuring the redis bundle
Open the redis bundle package configuration file (config/packages/snc_redis.yaml) and paste the following content:
snc_redis:
clients:
default:
type: phpredis
alias: default
dsn: "http://localhost:6401"
When the redis bundle reads the above configuration, it will create a service with the following id: snc_redis.default. This service is an instance of \Redis class which receives localhost as a host and 6401 as a port.
Now, go to your services.yaml file and bind a variable which will hold the snc_redis.default service:
services:
_defaults:
autowire: true
autoconfigure: true
bind:
$redis: '@snc_redis.default'
After doing it, we can inject the $redis variable in any of our services constructor.
The Messenger events
Symfony messenger dispatches the following events while it is handling a message:
- WorkerMessageReceivedEvent: When a message is received
- WorkerMessageHandledEvent: When a message is handled
- WorkerMessageFailedEvent: When a message handling fails
- WorkerMessageRetriedEvent: When a message handling is retried.
In the next sections we will listen to these events and will increment the corresponding redis key according to the event received.
The keys
We are going to use two types of keys:
- The general key: This key will hold the hits for all messages. It will be created following the next format:
"messenger:monitor:<time_event>:<messenger_event>"
There will be a general key for every time / event combination.
- The message key: This key will hold the hits for an specific message. It will be created following the next format:
"messenger:<message>:monitor:<time_event>:<messenger_event>"
There will be a message key for every message / time / event combination.
The service to hit redis
Before creating the service, let's create a couple of enums which will contain time events and messenger events.
enum MessengerEvents: string
{
case RECEIVED_EVENT = 'received';
case HANDLED_EVENT = 'handled';
case FAILED_EVENT = 'failed';
case RETRIED_EVENT = 'retried';
}
enum MessengerTimeEvents: string
{
case DAY_TIME_EVENT = 'day';
case HOUR_TIME_EVENT = 'hour';
}
We are going to hit redis when the following events occur:
- received: When WorkerMessageReceivedEvent is received
- handled: When WorkerMessageHandledEvent is received
- failed: When WorkerMessageFailedEvent is received
- retried: When WorkerMessageRetriedEvent is received
and, for every event received we are going to hit the general key and the message key twice: One for the day and the other one for the hour.
class MessengerHandler
{
public function __construct(
private readonly \Redis $redis
){}
/**
* @throws \RedisException
*/
public function hitEvent(string $message, MessengerEvents $event): void
{
$pipeline = $this->redis->pipeline();
$pipeline->hincrby($this->getMessageKey($message, $event, MessengerTimeEvents::DAY_TIME_EVENT), date('Ymd'), 1);
$pipeline->hincrby($this->getMessageKey($message, $event, MessengerTimeEvents::HOUR_TIME_EVENT), date('Ymdh'), 1);
$pipeline->hincrby($this->getGeneralKey($event, MessengerTimeEvents::DAY_TIME_EVENT), date('Ymd'), 1);
$pipeline->hincrby($this->getGeneralKey($event, MessengerTimeEvents::HOUR_TIME_EVENT), date('Ymdh'), 1);
$pipeline->exec();
$pipeline->close();
}
private function getMessageKey(string $message, MessengerEvents $event, MessengerTimeEvents $timeEvent): string
{
return "messenger:{$message}:monitor:{$timeEvent->value}:{$event->value}";
}
private function getGeneralKey(MessengerEvents $event, MessengerTimeEvents $timeEvent): string
{
return "messenger:monitor:{$timeEvent->value}:{$event->value}";
}
}
Let's explain the code starting by the private methods:
getMessageKey: This method receives the message, the messenger event and the time event. It uses these parameters to build the key, using the format we saw in the last section, and returns it.
getGeneralKey: It behaves as getMessageKey but without using the message string.
Now, let's see the hitEvent method. As we can see, it receives the message (we will se later how to get the message class from the envelope) and a MessengerEvents enum instance holding the event received.
Then, the method opens a redis pipeline, makes the hit for every key and executes it.
We use a pipeline to send all hits to redis at once. This allows us to sent 1 request to redis instead of 4.
Notice that we are using a redis HASH type. We use dates as a keys (format "Ymd" for days and "YmdH" for hours). and the hit counts as a values.
The event subscriber
Now, let's create the subscriber which will listen to the messenger events and will use our recently created MessengerHandler service.
class MessengerSubscriber implements EventSubscriberInterface
{
public function __construct(
private readonly MessengerHandler $messengerHandler
){}
public static function getSubscribedEvents(): array
{
return [
WorkerMessageReceivedEvent::class => 'onMessageReceived',
WorkerMessageHandledEvent::class => 'onMessageHandled',
WorkerMessageFailedEvent::class => 'onMessageFailed',
WorkerMessageRetriedEvent::class => 'onMessageRetried'
];
}
/**
* @throws \RedisException
*/
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
{
$messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
$this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::RECEIVED_EVENT);
}
/**
* @throws \RedisException
*/
public function onMessageHandled(WorkerMessageHandledEvent $event): void
{
$messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
$this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::HANDLED_EVENT);
}
/**
* @throws \RedisException
*/
public function onMessageFailed(WorkerMessageFailedEvent $event): void
{
$messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
$this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::FAILED_EVENT);
}
/**
* @throws \RedisException
*/
public function onMessageRetried(WorkerMessageRetriedEvent $event): void
{
$messageClass = new \ReflectionClass($event->getEnvelope()->getMessage());
$this->messengerHandler->hitEvent($messageClass->getShortName(), MessengerEvents::RETRIED_EVENT);
}
}
The subscriber is pretty straightforward. It links a method for each messenger event using the getSubscribedEvents function. The four subscriber methods perform the same logic. They create a \ReflectionClass from the envelope message and use $messengerhandler to hit the event. To get the message class name, they rely on the \ReflectionClass getShortName method. As a second parameter, each method passes its corresponding event from the enum.
Cheking the work
To check that all of this works, we are going to create a message and its handler and route it to a transport. Since we've installed redis, we will use the built-in messenger redis transport. Install redis-messenger using the following command:
composer require symfony/redis-messenger
If you get the following error: Invalid configuration for path "snc_redis.clients.default": You must install "ocramius/proxy-manager" or "friendsofphp/proxy-manager-lts" in order to enable logging for phpredis client, simply install proxy-manager-lts using composer too:
composer require friendsofphp/proxy-manager-lts
Now, let's create the message and the message handler:
namespace App\Messenger\Message;
class HelloWorldMessage
{
public function __construct(
public readonly string $name
){}
}
namespace App\Messenger\Message;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class HelloWorldMessageHandler
{
public function __invoke(HelloWorldMessage $message): void
{
echo 'Hello, my name is ' . $message->name;
}
}
The message holds a parameter name and the handler simply writes an introducing sentence using the message's parameter.
Now, let's configure the transport in the messenger package file (config/packages/messenger.yaml)
framework:
messenger:
transports:
async: "redis://localhost:6401"
routing:
'App\Messenger\Message\HelloWorldMessage': async
The above configuration creates the async transport which will use the redis server we installed before. Then, it configures our HelloWorldMessage to be routed to the async transport.
Now, create a symfony command, inject the Symfony\Component\Messenger\MessageBusInterface and queue a HelloWorldMessage message.
public function execute(InputInterface $input, OutputInterface $output): int
{
$this->bus->dispatch(new HelloWorldMessage('Peter Parker'));
return Command::SUCCESS;
}
After executing the command you will have a new enqueued message. Now, execute a worker so you can consume it:
bin/console messenger:consume async -vv --limit=1
If everything went fine, the general and message keys should have been created. To check it, install redis-cli and enter into the server shell using the following command:
redis-cli localhost:<your_port>
Once you are into shell, use the keys command to check that the keys exist:
To check the keys data, use the hgetall command
Let's check now that the failed and retried keys are also created. We will have to force the message handler to fail. To do it, comment the echo line and throw an exception instead:
public function __invoke(HelloWorldMessage $message): void
{
//echo 'Hello, my name is ' . $message->name;
throw new \Exception('I am failed :(');
}
If we queue another message and start a worker, we will se that it fails and symfony retries it 3 times (which is the default retry policy).
Let's check now that the failed and retry keys have been created:
Conclusion
In this article we have learned how to use redis and the symfony features to monitor messenger. This can show us how our message queueing system is working in order to detect anomalies and possible failures
I hope you found this post useful and interesting. If you enjoy my content and like the Symfony framework, consider reading my book: Building an Operation-Oriented Api using PHP and the Symfony Framework: A step-by-step guide
Top comments (0)