Ve la versión en español
Introduction
Habitually we use the same data structure for writing and querying information on a system, for larger systems this can cause bigger data structures because it needs to integrate read and write in the same model. For example, when writing information we may need a lot of validations in order to ensure the information that we are persisting is correct, asking for this information can be different and complex in order to retrieve filtered data or different data structures for each case.
CQRS is a pattern that separates read and update operations for a data store. Implementing CQRS in your application can maximize its performance, scalability, and security. The flexibility created by migrating to CQRS allows a system to better evolve over time and prevents update commands from causing merge conflicts at the domain level.
The pattern
CQRS separates the read structure using queries in order to read data and the write model using commands to make operations with the data
- Commands should be task-based, which means that we need to focus on the operation of the command, for example, on a delivery app when ordering something we will name the operation OrderProductCommand instead of AddProductToClient or CreateNewOrderProduct this also makes our application layer more consistent
- Queries never modify the database. A query returns a DTO that does not encapsulate any domain knowledge. We need to focus on the information needed not on the domain behavior.
Benefits
- Independent scaling. It allows the read and write models to scale independently
- Optimized data schemas. The read model can use a schema that is optimized for queries, and the write model uses a schema that is optimized for updates.
- Security. It's easier to ensure that only the right domain entities are performing writes.
- Separation of concerns. The complex business logic goes into the write model. The read model can be simple.
Implement CQRS with a Symfony Messenger
The messenger component helps applications send and receive messages to/from other applications or via message queues. It also allows us to define custom message buses defining message types and handlers.
Let's talk about the software architecture.
CommandBus
Talking about commands we need to modulate a common interface that a message bus can manage and transport to handlers, the Command interface ends up being our base interface for each command. Each Command Handler implementor is going to make operations with the command given but the command itself doesn’t know what operation is performed. Also, we are going to create an interface for the Command Bus in order to create different kinds of Transporters for our messages (commands), in this case, we create one In Memory Command Bus but we can extend this concept easily if needed (ex: A queued work).
We will arrive with something like this on the src code of our Symfony application.
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Command;
interface Command
{
}
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Command;
interface CommandBus
{
public function dispatch(Command $command) : void;
}
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Command;
interface CommandHandler
{
}
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Bus\Command;
use ...
final class InMemoryCommandBus implements CommandBus
{
private MessageBus $bus;
public function __construct(
iterable $commandHandlers
) {
$this->bus = new MessageBus([
new HandleMessageMiddleware(
new HandlersLocator(
HandlerBuilder::fromCallables($commandHandlers),
),
),
]);
}
/**
* @throws Throwable
*/
public function dispatch(Command $command): void
{
try {
$this->bus->dispatch($command);
} catch (NoHandlerForMessageException $e) {
throw new InvalidArgumentException(sprintf('The command has not a valid handler: %s', $command::class));
} catch (HandlerFailedException $e) {
throw $e->getPrevious();
}
}
}
For the In Memory Command Bus class we will need to register each Command Handler coming from the service definition, we will profit from a Symfony feature named Service tags provided from the Service Container and the Autoconfiguration, it allows us to tag a service that we can ask for it later, in config/services.yaml we indicate the Service Container to tag each instance of the Command Handler interface with the tag internal.command_handler
and later we declare our In Memory Command Bus passing all the implementors of Command Handler as an iterable argument. The Command Bus will take each Command Handler and declare the Command Expected with the appropriate Handler.
parameters:
services:
_defaults:
autowire: true
autoconfigure: true
_instanceof:
App\Shared\Domain\Bus\Command\CommandHandler:
tags: ['internal.command_handler']
...
### Buses
App\Shared\Domain\Bus\Command\CommandBus:
class: App\Shared\Infrastructure\Bus\Command\InMemoryCommandBus
arguments: [!tagged internal.command_handler]
We can create a Handler Builder tool that is responsible for searching on the __invoke of the Command Handler implementor and take the first argument type as the command needed for calling the handler. At this point, we create a convention where each Command Handler needs to be callable and have only one parameter with the type of the Command.
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Bus;
use ...
final class HandlerBuilder
{
/**
* @throws ReflectionException
*/
public static function fromCallables(iterable $callables) : array
{
$callablesHandlers = [];
foreach ($callables as $callable) {
$envelop = self::extractFirstParam($callable);
if (! array_key_exists($envelop, $callablesHandlers)) {
$callablesHandlers[self::extractFirstParam($callable)] = [];
}
$callablesHandlers[self::extractFirstParam($callable)][] = $callable;
}
return $callablesHandlers;
}
/**
* @throws ReflectionException
*/
private static function extractFirstParam(object|string $class) : string|null
{
$reflection = new ReflectionClass($class);
$method = $reflection->getMethod('__invoke');
if ($method->getNumberOfParameters() === 1) {
return $method->getParameters()[0]->getClass()?->getName();
}
return null;
}
}
Having that, we can start building our commands, for example:
<?php
declare(strict_types=1);
namespace App\EmailSender\Application\Create;
use ...
final class CreateEmailCommand implements Command
{
public function __construct(
private readonly string $sender,
private readonly string $addressee,
private readonly string $message,
) {
}
public function sender(): string
{
return $this->sender;
}
public function addressee(): string
{
return $this->addressee;
}
public function message(): string
{
return $this->message;
}
}
We can create a Create Email Command, it contains the information needed to create a new email but it doesn’t know the process needed for that.
<?php
declare(strict_types=1);
namespace App\EmailSender\Application\Create;
use ...
class CreateEmailCommandHandler implements CommandHandler
{
public function __construct(private EmailRepository $repository)
{
}
public function __invoke(CreateEmailCommand $command) : EmailId {
$email = Email::createNewEmail(
sender: new EmailAddress($command->sender()),
addressee: new EmailAddress($command->addressee()),
message: new Message($command->message()),
);
$this->repository->save($email);
return $email->id();
}
}
We create a handler for the previous Command, it knows that we will __invoke the object with a unique argument of type Create Email Command, and it knows all the processes needed for creating a new Email.
<?php
declare(strict_types=1);
namespace App\EmailSender\Infrastructure\Http;
use ...
class CreateEmailAction
{
public function __construct(
private readonly CreateEmailResponder $responder,
private readonly CommandBus $commandBus,
) {
}
public function __invoke(Request $request) : Response
{
try {
$this->commandBus->dispatch(
new CreateEmailCommand(
sender: $request->request->get('sender'),
addressee: $request->request->get('addressee'),
message: $request->request->get('message'),
),
);
} catch (Exception $e) {
$this->responder->loadError($e->getMessage());
}
return $this->responder->response();
}
}
So we can easily inject the Command Bus in an Action (Controller) class and dispatch the command, the action doesn’t know what happens in the core of the application but the Command Bus can ensure that we will send the Command to a proper Handler and it will perform an action. Notice that we know the action that is going to happen, it is provided from the name of the command.
QueryBus
Let's take a look at the model for a Query Bus. We can define a very similar architecture but now we need to return a value if we are asking for something with a query, we will need to introduce the concept of Response. A Response can be a collection of domain objects or it can be a single object or anything, who can determine what is the Response is the Query Handler which knows what information it needs to generate.
Then, we end up with something like this:
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Query;
interface Query
{
}
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Query;
interface QueryBus
{
public function ask(Query $query) : Response|null;
}
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Query;
interface QueryHandler
{
}
<?php
declare(strict_types=1);
namespace App\Shared\Domain\Bus\Query;
interface Response
{
}
<?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Bus\Query;
use ...
final class InMemoryQueryBus implements QueryBus
{
private MessageBus $bus;
public function __construct(iterable $queryHandlers)
{
$this->bus = new MessageBus([
new HandleMessageMiddleware(
new HandlersLocator(
HandlerBuilder::fromCallables($queryHandlers),
),
),
]);
}
public function ask(Query $query): Response|null
{
try {
/** @var HandledStamp $stamp */
$stamp = $this->bus->dispatch($query)->last(HandledStamp::class);
return $stamp->getResult();
} catch (NoHandlerForMessageException $e) {
throw new InvalidArgumentException(sprintf('The query has not a valid handler: %s', $query::class));
}
}
}
We can use the same approach for registering Query Handlers and Queries using the tool for building Handlers and knowing that we have a contract where the __invoke function needs to have just one argument that should be an implementor of the Query interface.
For getting the return value of a Query Handler we need to use the Handled Stamp that is going to mark the message as handled and give us access to the return value that at this point we know should be a Response implementor.
In the config/service.yaml we can tag any instance of Query Handler with internal.query_handler
and let the Service Container inject all the tagged to the In Memory Query Bus.
services:
_defaults:
autowire: true
autoconfigure: true
_instanceof:
...
App\Shared\Domain\Bus\Query\QueryHandler:
tags: ['internal.query_handler']
...
### Buses
...
App\Shared\Domain\Bus\Query\QueryBus:
class: App\Shared\Infrastructure\Bus\Query\InMemoryQueryBus
arguments: [ !tagged internal.query_handler ]
All things in place, we can start creating Queries for example:
<?php
declare(strict_types=1);
namespace App\EmailSender\Application\FindEmail;
use ...
final class FindEmailQuery implements Query
{
public function __construct(private readonly int $id)
{
}
public function id() : int
{
return $this->id;
}
}
A simple query containing the id of the email that we are looking for is going to be sent to the Find Email, an implementor for Query Handler. It has enough information about the email for finding it and it can build a response with the information needed.
<?php
declare(strict_types=1);
namespace App\EmailSender\Application\FindEmail;
use ...
final class FindEmail implements QueryHandler
{
public function __construct(private EmailRepository $repository)
{
}
public function __invoke(FindEmailQuery $query) : FindEmailResponse
{
$email = $this->repository->findById(
EmailId::fromInt(
$query->id(),
),
);
if ($email === null) {
throw new InvalidArgumentException('Email unreachable');
}
return new FindEmailResponse(
email: $email,
);
}
}
<?php
declare(strict_types=1);
namespace App\EmailSender\Application\FindEmail;
use ...
final class FindEmailResponse implements Response
{
public function __construct(private readonly EmailDto $email)
{
}
public function email() : EmailDto
{
return $this->email;
}
}
Finally, we can use the Query Bus in any Action class
<?php
declare(strict_types=1);
namespace App\EmailSender\Infrastructure\Http;
use ...
class GetEmailAction
{
public function __construct(
private GetEmailResponder $responder,
private QueryBus $queryBus,
) {
}
public function __invoke(Request $request, int $id) : Response
{
try {
/** @var FindEmailResponse $findEmailResponse */
$findEmailResponse = $this->queryBus->ask(
new FindEmailQuery(id: $id)
);
$email = $findEmailResponse->email();
$this->responder->loadEmail($email);
} catch (Exception $e) {
$this->responder->loadError($e->getMessage());
}
return $this->responder->response();
}
}
Again, the Action knows what is it looking for but it doesn’t know the complete process for getting it.
Conclusion
We can easily implement a CQRS pattern with components of Symfony making custom Message Buses and defining a model that can be reused along the app. CQRS can help us to separate operations and searching concerns into descriptive Command/Query classes for building better-isolated processes making classes open for changes.
See the code on
AdGARAY / cqrs-symfony
CQRS example with Symfony Messenger
CQRS with Symfony Messenger
Requirements
- Docker compose
Setup
Initialize containers
$ docker compose up -d
Enter into the php container
$ docker compose exec -it php bash
Install composer dependencies
/var/www/html# $ composer install
Run migrations
/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Go to localhost:8080
The php image already have xDebug listening on port 9003
with server name serverName=application
if you want to go step by step
See the full post on dev.to/adgaray and the spanish version
Top comments (4)
Hi, thanks for your post, that's interesting.
However, you didn't mention the validation of your models. Where should it be done? In the Command objects of the application layer or directly in the models of the domain layer?
Hi, sorry for the late response, I think validations of types are going to happen before dispatching the command, taking the request parameters and cast it to the expected types. Validations like the max/min length for the email is going to be placed on the VO EmailAddress.php
Hi, what about exist entity validation?
Could you elaborate more?