DEV Community

Adrian Garay
Adrian Garay

Posted on

CQRS con Symfony Messenger (Español)

See the english version

Introducción

Habitualmente usamos la misma estructura de datos para escribir y consultar información en un sistema, para sistemas más grandes esto puede causar estructuras de datos más grandes porque necesita integrar lectura y escritura en el mismo modelo. Por ejemplo, al escribir información podemos necesitar muchas validaciones para asegurarnos de que la información que queremos guardar sea correcta, pedir esta información puede ser diferente y complejo para recuperar datos filtrados o estructuras de datos diferentes para cada caso.

CQRS is a pattern that separates read and update operations for a data store. Implementing CQRS in your application can maximize its performance, scalability, and security. The flexibility created by migrating to CQRS allows a system to better evolve over time and prevents update commands from causing merge conflicts at the domain level.

El patrón de diseño

CQRS separa la estructura de lectura mediante consultas para leer datos y el modelo de escritura mediante comandos para realizar operaciones con los datos

  • Los comandos deben basarse en tareas, lo que significa que debemos centrarnos en la operación del comando, por ejemplo, en una aplicación de entrega al pedir algo, llamaremos a la operación OrderProductCommand en lugar de AddProductToClient o CreateNewOrderProduct, esto también hace que nuestra capa de aplicación sea más consistente.
  • Las consultas nunca modifican la base de datos. Una consulta devuelve un DTO que no encapsula ningún conocimiento del dominio. Necesitamos centrarnos en la información necesaria, no en el comportamiento del dominio.

Beneficios

  • Independent scaling. Permite que los modelos de lectura y escritura se escalen de forma independiente
  • Optimized data schemas. El modelo de lectura puede usar un esquema optimizado para consultas y el modelo de escritura usa un esquema optimizado para actualizaciones.
  • Security. Es más fácil asegurarse de que solo las entidades de dominio correctas realicen escrituras.
  • Separation of concerns. La lógica empresarial compleja entra en el modelo de escritura. El modelo de lectura puede ser simple.

Implementar CQRS con Symfony Messenger

El Componente Messenger ayuda a las aplicaciones a enviar y recibir mensajes hacia/desde otras aplicaciones o mediante colas de mensajes. También nos permite definir buses de mensajes personalizados que definen tipos de mensajes y controladores.

Hablemos de la arquitectura del software.

CommandBus

Command architecture

Hablando de comandos, necesitamos modular una interfaz común que un bus de mensajes pueda administrar y transportar a los controladores, la interfaz de comando termina siendo nuestra interfaz base para cada comando. Cada implementador de Command Handler realizará operaciones con el comando dado, pero el comando en sí mismo no sabe qué operación se realiza. Además, vamos a crear una interfaz para el bus de comandos con el fin de crear diferentes tipos de transportadores para nuestros mensajes (comandos), en este caso, creamos un bus de comandos en memoria, pero podemos ampliar este concepto fácilmente si es necesario (ej: Un trabajo en cola).

Terminaremos con algo así en el código src de nuestra aplicación Symfony.

Command organization

<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface Command
{
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface CommandBus
{
    public function dispatch(Command $command) : void;
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Command;

interface CommandHandler
{
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus\Command;

use ...

final class InMemoryCommandBus implements CommandBus
{
    private MessageBus $bus;

    public function __construct(
        iterable $commandHandlers
    ) {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($commandHandlers),
                ),
            ),
        ]);
    }

    /**
     * @throws Throwable
     */
    public function dispatch(Command $command): void
    {
        try {
            $this->bus->dispatch($command);
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The command has not a valid handler: %s', $command::class));
        } catch (HandlerFailedException $e) {
            throw $e->getPrevious();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Para la clase In Memory Command Bus, necesitaremos registrar cada controlador de comandos proveniente de la definición del servicio, nos beneficiaremos de una característica de Symfony llamada Service tags provistos desde el Service Container y el Autoconfiguration, nos permite etiquetar un servicio que luego podemos solicitar, en config/services.yaml indicamos el Service Container que deseamos etiquetar cada instancia de la interfaz Command Handler con la etiqueta internal.command_handler y luego declaramos nuestro In Memory Command Bus pasando todos los implementadores de Command Handler como un argumento iterable. El bus de comandos tomará cada controlador de comandos y declarará el comando esperado con el controlador adecuado.

parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    _instanceof:
        App\Shared\Domain\Bus\Command\CommandHandler:
            tags: ['internal.command_handler']
...
    ### Buses
    App\Shared\Domain\Bus\Command\CommandBus:
        class: App\Shared\Infrastructure\Bus\Command\InMemoryCommandBus
        arguments: [!tagged internal.command_handler]
Enter fullscreen mode Exit fullscreen mode

Podemos crear una herramienta de creación de controladores que se encargue de buscar en la función __invoke del implementador del controlador de comandos y tomar el primer tipo de argumento como el comando necesario para llamar al controlador. En este punto, creamos una convención en la que cada controlador de comandos debe poder llamarse como función y tener solo un parámetro con el tipo de comando.

<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus;

use ...

final class HandlerBuilder
{
    /**
     * @throws ReflectionException
     */
    public static function fromCallables(iterable $callables) : array
    {
        $callablesHandlers = [];

        foreach ($callables as $callable) {
            $envelop = self::extractFirstParam($callable);

            if (! array_key_exists($envelop, $callablesHandlers)) {
                $callablesHandlers[self::extractFirstParam($callable)] = [];
            }

            $callablesHandlers[self::extractFirstParam($callable)][] = $callable;
        }

        return $callablesHandlers;
    }

    /**
     * @throws ReflectionException
     */
    private static function extractFirstParam(object|string $class) : string|null
    {
        $reflection = new ReflectionClass($class);
        $method     = $reflection->getMethod('__invoke');

        if ($method->getNumberOfParameters() === 1) {
            return $method->getParameters()[0]->getClass()?->getName();
        }

        return null;
    }
}
Enter fullscreen mode Exit fullscreen mode

Teniendo eso, podemos comenzar a construir nuestros comandos, por ejemplo:

Command implementation

<?php

declare(strict_types=1);

namespace App\EmailSender\Application\Create;

use ...

final class CreateEmailCommand implements Command
{
    public function __construct(
        private readonly string $sender,
        private readonly string $addressee,
        private readonly string $message,
    ) {
    }

    public function sender(): string
    {
        return $this->sender;
    }

    public function addressee(): string
    {
        return $this->addressee;
    }

    public function message(): string
    {
        return $this->message;
    }
}
Enter fullscreen mode Exit fullscreen mode

Podemos crear un comando Create Email Command, contiene la información necesaria para crear un nuevo correo electrónico, pero no conoce el proceso necesario para eso.

<?php

declare(strict_types=1);

namespace App\EmailSender\Application\Create;

use ...

class CreateEmailCommandHandler implements CommandHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(CreateEmailCommand $command) : EmailId {
        $email = Email::createNewEmail(
            sender: new EmailAddress($command->sender()),
            addressee: new EmailAddress($command->addressee()),
            message: new Message($command->message()),
        );

        $this->repository->save($email);

        return $email->id();
    }
}
Enter fullscreen mode Exit fullscreen mode

Creamos un Command Handler para el comando anterior, sabe que la función __invoke del objeto contiene un argumento único del tipo Create Email Command y conoce todos los procesos necesarios para crear un nuevo correo electrónico.

<?php

declare(strict_types=1);

namespace App\EmailSender\Infrastructure\Http;

use ...

class CreateEmailAction
{
    public function __construct(
        private readonly CreateEmailResponder $responder,
        private readonly CommandBus $commandBus,
    ) {
    }

    public function __invoke(Request $request) : Response
    {
        try {
            $this->commandBus->dispatch(
                new CreateEmailCommand(
                    sender: $request->request->get('sender'),
                    addressee: $request->request->get('addressee'),
                    message: $request->request->get('message'),
                ),
            );
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}
Enter fullscreen mode Exit fullscreen mode

Entonces podemos inyectar fácilmente el Command Bus en una clase Action (Controller) y enviar el comando, la acción no sabe lo que sucede en el núcleo de la aplicación, pero el Command Bus puede asegurarse de que le enviaremos el comando a un Handler adecuado y realizará una acción. Tenga en cuenta que sabemos la acción que va a ocurrir, se proporciona a partir del nombre del comando.

QueryBus

Query architecture

Echemos un vistazo al modelo para un Query Bus. Podemos definir una arquitectura muy similar pero ahora necesitamos devolver un valor si estamos pidiendo algo con una consulta, necesitaremos introducir el concepto de Response. Una Response puede ser una colección de objetos de dominio o puede ser un solo objeto o cualquier cosa, quien puede determinar cuál es la Response es el Query Handler que sabe qué información necesita generar.

Entonces, terminamos con algo como esto:

Query organization

<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface Query
{
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface QueryBus
{
    public function ask(Query $query) : Response|null;
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface QueryHandler
{
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Domain\Bus\Query;

interface Response
{
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\Shared\Infrastructure\Bus\Query;

use ...

final class InMemoryQueryBus implements QueryBus
{
    private MessageBus $bus;

    public function __construct(iterable $queryHandlers)
    {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($queryHandlers),
                ),
            ),
        ]);
    }

    public function ask(Query $query): Response|null
    {
        try {
            /** @var HandledStamp $stamp */
            $stamp = $this->bus->dispatch($query)->last(HandledStamp::class);

            return $stamp->getResult();
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The query has not a valid handler: %s', $query::class));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Podemos usar el mismo enfoque para registrar Query Handlers y Queries usando la herramienta para construir Handlers y sabiendo que tenemos un contrato donde la función __invoke necesita tener solo un argumento que debería ser un implementador de la interfaz Query.

Para obtener el valor de retorno de un Query Handler necesitamos usar Handled Stamp que marcará el mensaje como manejado y nos dará acceso al valor de retorno que en este punto sabemos que debería ser un implementador de Response.

En config/service.yaml podemos etiquetar cualquier instancia de Query Handler con internal.query_handler y dejar al Service Container inyectar todos los etiquetados al In Memory Query Bus.

services:
    _defaults:
        autowire: true 
        autoconfigure: true

    _instanceof:
        ...

        App\Shared\Domain\Bus\Query\QueryHandler:
            tags: ['internal.query_handler']
        ...
    ### Buses
    ...

    App\Shared\Domain\Bus\Query\QueryBus:
        class: App\Shared\Infrastructure\Bus\Query\InMemoryQueryBus
        arguments: [ !tagged internal.query_handler ]
Enter fullscreen mode Exit fullscreen mode

Con todo en su lugar, podemos comenzar a crear Queries, por ejemplo:

Query implementation

<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmailQuery implements Query
{
    public function __construct(private readonly int $id)
    {
    }

    public function id() : int
    {
        return $this->id;
    }
}
Enter fullscreen mode Exit fullscreen mode

Una consulta simple que contiene el id del correo electrónico que estamos buscando se enviará al Find Email, un implementador de Query Handler. Tiene suficiente información sobre el correo electrónico para encontrarlo y puede generar una respuesta con la información necesaria.

<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmail implements QueryHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(FindEmailQuery $query) : FindEmailResponse
    {
        $email = $this->repository->findById(
            EmailId::fromInt(
                $query->id(),
            ),
        );

        if ($email === null) {
            throw new InvalidArgumentException('Email unreachable');
        }

        return new FindEmailResponse(
            email: $email,
        );
    }
}
Enter fullscreen mode Exit fullscreen mode
<?php

declare(strict_types=1);

namespace App\EmailSender\Application\FindEmail;

use ...

final class FindEmailResponse implements Response
{
    public function __construct(private readonly EmailDto $email)
    {
    }

    public function email() : EmailDto
    {
        return $this->email;
    }
}
Enter fullscreen mode Exit fullscreen mode

Finalmente, podemos usar Query Bus en cualquier clase de Acción

<?php

declare(strict_types=1);

namespace App\EmailSender\Infrastructure\Http;

use ...

class GetEmailAction
{
    public function __construct(
        private GetEmailResponder $responder,
        private QueryBus $queryBus,
    ) {
    }

    public function __invoke(Request $request, int $id) : Response
    {
        try {
            /** @var FindEmailResponse $findEmailResponse */
            $findEmailResponse = $this->queryBus->ask(
                new FindEmailQuery(id: $id)
            );

            $email = $findEmailResponse->email();

            $this->responder->loadEmail($email);
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}
Enter fullscreen mode Exit fullscreen mode

Nuevamente, la Acción sabe lo que está buscando pero no conoce el proceso completo para obtenerlo.

Conclusion

Podemos implementar fácilmente un patrón CQRS con componentes de Symfony creando buses de mensajes personalizados y definiendo un modelo que se puede reutilizar a lo largo de la aplicación. CQRS puede ayudarnos a separar las operaciones y las inquietudes de búsqueda en clases descriptivas de Command/Query para crear procesos mejor aislados, lo que hace que las clases estén abiertas a cambios.

See the code on

GitHub logo AdGARAY / cqrs-symfony

CQRS example with Symfony Messenger

CQRS with Symfony Messenger

Requirements

  • Docker compose

Setup

Initialize containers

$ docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Enter into the php container

$ docker compose exec -it php bash
Enter fullscreen mode Exit fullscreen mode

Install composer dependencies

/var/www/html# $ composer install
Enter fullscreen mode Exit fullscreen mode

Run migrations

/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Enter fullscreen mode Exit fullscreen mode

Go to localhost:8080

The php image already have xDebug listening on port 9003 with server name serverName=application if you want to go step by step

See the full post on dev.to/adgaray and the spanish version

Top comments (0)