DEV Community

Adrian Garay
Adrian Garay

Posted on

CQRS with Symfony Messenger

Ve la versión en español

Introduction

Habitually we use the same data structure for writing and querying information on a system, for larger systems this can cause bigger data structures because it needs to integrate read and write in the same model. For example, when writing information we may need a lot of validations in order to ensure the information that we are persisting is correct, asking for this information can be different and complex in order to retrieve filtered data or different data structures for each case.

CQRS is a pattern that separates read and update operations for a data store. Implementing CQRS in your application can maximize its performance, scalability, and security. The flexibility created by migrating to CQRS allows a system to better evolve over time and prevents update commands from causing merge conflicts at the domain level.

The pattern

CQRS separates the read structure using queries in order to read data and the write model using commands to make operations with the data

  • Commands should be task-based, which means that we need to focus on the operation of the command, for example, on a delivery app when ordering something we will name the operation OrderProductCommand instead of AddProductToClient or CreateNewOrderProduct this also makes our application layer more consistent
  • Queries never modify the database. A query returns a DTO that does not encapsulate any domain knowledge. We need to focus on the information needed not on the domain behavior.

Benefits

  • Independent scaling. It allows the read and write models to scale independently
  • Optimized data schemas. The read model can use a schema that is optimized for queries, and the write model uses a schema that is optimized for updates.
  • Security. It's easier to ensure that only the right domain entities are performing writes.
  • Separation of concerns. The complex business logic goes into the write model. The read model can be simple.

Implement CQRS with a Symfony Messenger

The messenger component helps applications send and receive messages to/from other applications or via message queues. It also allows us to define custom message buses defining message types and handlers.

Let's talk about the software architecture.

CommandBus

Command bus architecture

Talking about commands we need to modulate a common interface that a message bus can manage and transport to handlers, the Command interface ends up being our base interface for each command. Each Command Handler implementor is going to make operations with the command given but the command itself doesn’t know what operation is performed. Also, we are going to create an interface for the Command Bus in order to create different kinds of Transporters for our messages (commands), in this case, we create one In Memory Command Bus but we can extend this concept easily if needed (ex: A queued work).

We will arrive with something like this on the src code of our Symfony application.

Command dir organization



<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface Command
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface CommandBus
{
    public function dispatch(Command $command) : void;
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface CommandHandler
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus\Command;

use ...

final class InMemoryCommandBus implements CommandBus
{
    private MessageBus $bus;

    public function __construct(
        iterable $commandHandlers
    ) {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($commandHandlers),
                ),
            ),
        ]);
    }

    /**
     * @throws Throwable
     */
    public function dispatch(Command $command): void
    {
        try {
            $this->bus->dispatch($command);
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The command has not a valid handler: %s', $command::class));
        } catch (HandlerFailedException $e) {
            throw $e->getPrevious();
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

For the In Memory Command Bus class we will need to register each Command Handler coming from the service definition, we will profit from a Symfony feature named Service tags provided from the Service Container and the Autoconfiguration, it allows us to tag a service that we can ask for it later, in config/services.yaml we indicate the Service Container to tag each instance of the Command Handler interface with the tag internal.command_handler and later we declare our In Memory Command Bus passing all the implementors of Command Handler as an iterable argument. The Command Bus will take each Command Handler and declare the Command Expected with the appropriate Handler.



parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    _instanceof:
        App\Shared\Domain\Bus\Command\CommandHandler:
            tags: ['internal.command_handler']
...
    ### Buses
    App\Shared\Domain\Bus\Command\CommandBus:
        class: App\Shared\Infrastructure\Bus\Command\InMemoryCommandBus
        arguments: [!tagged internal.command_handler]


Enter fullscreen mode Exit fullscreen mode

We can create a Handler Builder tool that is responsible for searching on the __invoke of the Command Handler implementor and take the first argument type as the command needed for calling the handler. At this point, we create a convention where each Command Handler needs to be callable and have only one parameter with the type of the Command.



<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus;

use ...

final class HandlerBuilder
{
    /**
     * @throws ReflectionException
     */
    public static function fromCallables(iterable $callables) : array
    {
        $callablesHandlers = [];

        foreach ($callables as $callable) {
            $envelop = self::extractFirstParam($callable);

            if (! array_key_exists($envelop, $callablesHandlers)) {
                $callablesHandlers[self::extractFirstParam($callable)] = [];
            }

            $callablesHandlers[self::extractFirstParam($callable)][] = $callable;
        }

        return $callablesHandlers;
    }

    /**
     * @throws ReflectionException
     */
    private static function extractFirstParam(object|string $class) : string|null
    {
        $reflection = new ReflectionClass($class);
        $method     = $reflection->getMethod('__invoke');

        if ($method->getNumberOfParameters() === 1) {
            return $method->getParameters()[0]->getClass()?->getName();
        }

        return null;
    }
}


Enter fullscreen mode Exit fullscreen mode

Having that, we can start building our commands, for example:

Command implementation organization



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\Create;

use ...

final class CreateEmailCommand implements Command
{
    public function __construct(
        private readonly string $sender,
        private readonly string $addressee,
        private readonly string $message,
    ) {
    }

    public function sender(): string
    {
        return $this->sender;
    }

    public function addressee(): string
    {
        return $this->addressee;
    }

    public function message(): string
    {
        return $this->message;
    }
}


Enter fullscreen mode Exit fullscreen mode

We can create a Create Email Command, it contains the information needed to create a new email but it doesn’t know the process needed for that.



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\Create;

use ...

class CreateEmailCommandHandler implements CommandHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(CreateEmailCommand $command) : EmailId {
        $email = Email::createNewEmail(
            sender: new EmailAddress($command->sender()),
            addressee: new EmailAddress($command->addressee()),
            message: new Message($command->message()),
        );

        $this->repository->save($email);

        return $email->id();
    }
}


Enter fullscreen mode Exit fullscreen mode

We create a handler for the previous Command, it knows that we will __invoke the object with a unique argument of type Create Email Command, and it knows all the processes needed for creating a new Email.



<?php

declare(strict_types=1);

namespace App\EmailSender\Infrastructure\Http;

use ...

class CreateEmailAction
{
    public function __construct(
        private readonly CreateEmailResponder $responder,
        private readonly CommandBus $commandBus,
    ) {
    }

    public function __invoke(Request $request) : Response
    {
        try {
            $this->commandBus->dispatch(
                new CreateEmailCommand(
                    sender: $request->request->get('sender'),
                    addressee: $request->request->get('addressee'),
                    message: $request->request->get('message'),
                ),
            );
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}


Enter fullscreen mode Exit fullscreen mode

So we can easily inject the Command Bus in an Action (Controller) class and dispatch the command, the action doesn’t know what happens in the core of the application but the Command Bus can ensure that we will send the Command to a proper Handler and it will perform an action. Notice that we know the action that is going to happen, it is provided from the name of the command.

QueryBus

Query architecture

Let's take a look at the model for a Query Bus. We can define a very similar architecture but now we need to return a value if we are asking for something with a query, we will need to introduce the concept of Response. A Response can be a collection of domain objects or it can be a single object or anything, who can determine what is the Response is the Query Handler which knows what information it needs to generate.

Then, we end up with something like this:

Query dir organization



<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface Query
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface QueryBus
{
    public function ask(Query $query) : Response|null;
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface QueryHandler
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface Response
{
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus\Query;

use ...

final class InMemoryQueryBus implements QueryBus
{
    private MessageBus $bus;

    public function __construct(iterable $queryHandlers)
    {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($queryHandlers),
                ),
            ),
        ]);
    }

    public function ask(Query $query): Response|null
    {
        try {
            /** @var HandledStamp $stamp */
            $stamp = $this->bus->dispatch($query)->last(HandledStamp::class);

            return $stamp->getResult();
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The query has not a valid handler: %s', $query::class));
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

We can use the same approach for registering Query Handlers and Queries using the tool for building Handlers and knowing that we have a contract where the __invoke function needs to have just one argument that should be an implementor of the Query interface.

For getting the return value of a Query Handler we need to use the Handled Stamp that is going to mark the message as handled and give us access to the return value that at this point we know should be a Response implementor.

In the config/service.yaml we can tag any instance of Query Handler with internal.query_handler and let the Service Container inject all the tagged to the In Memory Query Bus.



services:
    _defaults:
        autowire: true 
        autoconfigure: true

    _instanceof:
        ...

        App\Shared\Domain\Bus\Query\QueryHandler:
            tags: ['internal.query_handler']
        ...
    ### Buses
    ...

    App\Shared\Domain\Bus\Query\QueryBus:
        class: App\Shared\Infrastructure\Bus\Query\InMemoryQueryBus
        arguments: [ !tagged internal.query_handler ]


Enter fullscreen mode Exit fullscreen mode

All things in place, we can start creating Queries for example:

Query implementation



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmailQuery implements Query
{
    public function __construct(private readonly int $id)
    {
    }

    public function id() : int
    {
        return $this->id;
    }
}


Enter fullscreen mode Exit fullscreen mode

A simple query containing the id of the email that we are looking for is going to be sent to the Find Email, an implementor for Query Handler. It has enough information about the email for finding it and it can build a response with the information needed.



<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmail implements QueryHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(FindEmailQuery $query) : FindEmailResponse
    {
        $email = $this->repository->findById(
            EmailId::fromInt(
                $query->id(),
            ),
        );

        if ($email === null) {
            throw new InvalidArgumentException('Email unreachable');
        }

        return new FindEmailResponse(
            email: $email,
        );
    }
}


Enter fullscreen mode Exit fullscreen mode


<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmailResponse implements Response
{
    public function __construct(private readonly EmailDto $email)
    {
    }

    public function email() : EmailDto
    {
        return $this->email;
    }
}


Enter fullscreen mode Exit fullscreen mode

Finally, we can use the Query Bus in any Action class



<?php

declare(strict_types=1);

namespace App\EmailSender\Infrastructure\Http;

use ...

class GetEmailAction
{
    public function __construct(
        private GetEmailResponder $responder,
        private QueryBus $queryBus,
    ) {
    }

    public function __invoke(Request $request, int $id) : Response
    {
        try {
            /** @var FindEmailResponse $findEmailResponse */
            $findEmailResponse = $this->queryBus->ask(
                new FindEmailQuery(id: $id)
            );

            $email = $findEmailResponse->email();

            $this->responder->loadEmail($email);
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}


Enter fullscreen mode Exit fullscreen mode

Again, the Action knows what is it looking for but it doesn’t know the complete process for getting it.

Conclusion

We can easily implement a CQRS pattern with components of Symfony making custom Message Buses and defining a model that can be reused along the app. CQRS can help us to separate operations and searching concerns into descriptive Command/Query classes for building better-isolated processes making classes open for changes.

See the code on

GitHub logo AdGARAY / cqrs-symfony

CQRS example with Symfony Messenger

CQRS with Symfony Messenger

Requirements

  • Docker compose

Setup

Initialize containers

$ docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Enter into the php container

$ docker compose exec -it php bash
Enter fullscreen mode Exit fullscreen mode

Install composer dependencies

/var/www/html# $ composer install
Enter fullscreen mode Exit fullscreen mode

Run migrations

/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Enter fullscreen mode Exit fullscreen mode

Go to localhost:8080

The php image already have xDebug listening on port 9003 with server name serverName=application if you want to go step by step

See the full post on dev.to/adgaray and the spanish version




Top comments (4)

Collapse
 
lobodol profile image
lobodol • Edited

Hi, thanks for your post, that's interesting.
However, you didn't mention the validation of your models. Where should it be done? In the Command objects of the application layer or directly in the models of the domain layer?

Collapse
 
adgaray profile image
Adrian Garay

Hi, sorry for the late response, I think validations of types are going to happen before dispatching the command, taking the request parameters and cast it to the expected types. Validations like the max/min length for the email is going to be placed on the VO EmailAddress.php

Collapse
 
vuylov profile image
Dmitriy

Hi, what about exist entity validation?

Thread Thread
 
adgaray profile image
Adrian Garay

Could you elaborate more?