DEV Community

Cover image for Java Quarkus CQRS and EventSourcing microservice example πŸ‘‹πŸ’«βœ¨
Alexander
Alexander

Posted on

Java Quarkus CQRS and EventSourcing microservice example πŸ‘‹πŸ’«βœ¨

πŸ‘¨β€πŸ’» Full list what has been used:

Source code you can find in GitHub repository.
The main idea of this project is the implementation of Event Sourcing and CQRS using Java, Quarkus with reactive Vertx, Postgresql and Kafka.
Previously have written same articles where implemented the same microservice using Go and EventStoreDB,
and Spring,as written before, repeat here, think EventStoreDB is the best choice for event sourcing, but in real life at some projects we usually have business restrictions and for example
usage of the EventStoreDB can be not allowed, in this case, think postgres and kafka is good alternative for implementing our own event store.
Didn't write in this article about Event Sourcing and CQRS patterns, the best place to read is microservices.io,blog and documentation of this article is very good too,
and as written in the previous article, highly recommend Alexey Zimarev "Hands-on Domain-Driven Design with .NET Core" book and also his blog.

Reactive approach of Quarkus Vertex or Spring Reactor has some tradeoffs, not all libraries has good integration with it, like tracing, it's harder to debug,
writing complicated business logic in reactive functional way is very specific and feels more difficult, and not only for writing and for reading too.
Quarkus Vertex with Mutiny in my opinion has much better reactive API,
documentation and production ready libraries.

In this project we have microservice events store implemented using [PostgreSQL(https://www.eventstore.com/),**Vertx** reactive PostgreSQL clientand Kafka Reactive, for projections used quarkus-mongodb-client and communicate by REST using RESTEasy Reactive.

For postgresql Quarkus has reactive Vertx Reactive SQL Clients
and Reactive Hibernate with Panache,
the first one is good especially with Mutiny pgPool and flyway as migration tool.

All UI interfaces will be available on ports:

Swagger UI: http://localhost:8006/swagger-ui/index.html

Swagger

Jaeger UI: http://localhost:16686

Jaeger

Prometheus UI: http://localhost:9090

Prometheus

Grafana UI: http://localhost:3005

Grafana

Docker compose file for this project:

version: "3.9"

services:
  postgresql:
    image: postgres:14.2
    container_name: postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=microservices
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: 'bitnami/kafka:3.0.1'
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "./kafka_data:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  mongodb:
    image: docker.io/bitnami/mongodb:4.4
    restart: always
    container_name: microservices_mongo
    environment:
      MONGODB_ROOT_USER: admin
      MONGODB_ROOT_PASSWORD: admin
      BITNAMI_DEBUG: "false"
      ALLOW_EMPTY_PASSWORD: "no"
      MONGODB_SYSTEM_LOG_VERBOSITY: "0"
      MONGODB_DISABLE_SYSTEM_LOG: "no"
      MONGODB_DISABLE_JAVASCRIPT: "no"
      MONGODB_ENABLE_JOURNAL: "yes"
      MONGODB_ENABLE_IPV6: "no"
      MONGODB_ENABLE_DIRECTORY_PER_DB: "no"
      MONGODB_DATABASE: "microservices"
    volumes:
      - ./mongodb_data_container:/data/db
    ports:
      - "27017:27017"
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

In Event Sourcing we are storing the history of all the actions that have occurred to an entity and deriving the state from that,
it is possible to read back through that history in order to establish what the state was at a given point in time.
It is a pattern for storing data as events in an append-only log.

Every new event is a change.
The AggregateRoot class should keep track of all the changes that happen during the command execution flow,
so we can persist those changes in the command handler.
Aggregates take the current state, verify the business rules for the particular operation
and apply the business logic that returns the new state. The important part of this process is storing all or nothing.
All aggregated data needs to be saved successfully. If one rule or operation fails then the whole state change is rejected.
AggregateRoot can be implemented in different ways, the main methods is load events - apply and raise changes.
When we fetch the aggregate from the database, instead of reading its state as one record in a table or document,
we read all events that were saved before and call when method for each.
After all these steps, we will recover all the history of a given aggregate.
By doing this, we will be bringing our aggregate to its latest state.

@Data
@NoArgsConstructor
public abstract class AggregateRoot {

    protected String id;
    protected String type;
    protected long version;
    protected final List<Event> changes = new ArrayList<>();

    public AggregateRoot(final String id, final String aggregateType) {
        this.id = id;
        this.type = aggregateType;
    }


    public abstract void when(final Event event);

    public void load(final List<Event> events) {
        events.forEach(event -> {
            this.validateEvent(event);
            this.raiseEvent(event);
            this.version++;
        });
    }

    public void apply(final Event event) {
        this.validateEvent(event);
        event.setAggregateType(this.type);

        when(event);
        changes.add(event);

        this.version++;
        event.setVersion(this.version);
    }

    public void raiseEvent(final Event event) {
        this.validateEvent(event);

        event.setAggregateType(this.type);
        when(event);

        this.version++;
    }

    public void clearChanges() {
        this.changes.clear();
    }

    public void toSnapshot() {
        this.clearChanges();
    }

    private void validateEvent(final Event event) {
        if (Objects.isNull(event) || !event.getAggregateId().equals(this.id))
            throw new InvalidEventException(event.toString());
    }
}
Enter fullscreen mode Exit fullscreen mode

An event represents a fact that took place in the domain. They are the source of truth; your current state is derived from the events.
Events are immutable and represent the business facts.
In Event Sourcing, each operation made on the aggregate should result with the new event.
An event represents a fact that took place in the domain. They are the source of truth, your current state is derived from the events.
They are immutable and represent the business facts.
It means that we never change or remove anything in the database, and we only append new events.

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Event {
    private UUID id;
    private String aggregateId;
    private String eventType;
    private String aggregateType;
    private long version;
    private byte[] data;
    private byte[] metaData;
    private LocalDateTime timeStamp;


    public Event(String eventType, String aggregateType) {
        this.id = UUID.randomUUID();
        this.eventType = eventType;
        this.aggregateType = aggregateType;
        this.timeStamp = LocalDateTime.now();
    }
}
Enter fullscreen mode Exit fullscreen mode

Snapshots are the representation of the current state at a certain "point in time".
If we follow the Event Sourcing pattern literally, we need to get all these transactions to calculate the current account's balance.
This won't be efficient. Your first thought to make this more efficient may be caching the latest state somewhere.
Instead of retrieving all these events, we could retrieve one record and use it for our business logic. This is a Snapshot.
The general logic: read the snapshot (if it exists), then read events from the EventStoreDB,
if a snapshot exists, read events since the last stream revision of which snapshot was created, otherwise, read all events.
In our microservice we are storing the snapshot of every N number of events.
Snapshots can be not needed as performance may be good enough.

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Snapshot {

    private UUID id;
    private String aggregateId;
    private String aggregateType;
    private byte[] data;
    private byte[] metaData;
    private long version;
    private LocalDateTime timeStamp;
}
Enter fullscreen mode Exit fullscreen mode

Event store is a key element of a system. Each change that took place in the domain is recorded in the database.
It is specifically designed to store the history of changes, the state is represented by the append-only log of events.
The events are immutable: they cannot be changed.
Implementation of AggregateStore is Load, Save, and Exists methods.
Load and Save accept aggregate then load or apply events using EventStoreDB client.
The Load method: find out the stream name for an aggregate, read all the events from the aggregate stream,
loop through all the events, and call the RaiseEvent handler for each of them.
And the Save method persists aggregates by saving the history of changes, handling concurrency,
when you retrieve a stream from EventStoreDB, you take note of the current version number,
then when you save it back you can determine if somebody else has modified the record in the meantime.
The reactive postgresql event store implementation:

@ApplicationScoped
public class EventStore implements EventStoreDB {

    private final static Logger logger = Logger.getLogger(EventStore.class);

    private final int SNAPSHOT_FREQUENCY = 3;
    private final static String SAVE_EVENTS_QUERY = "INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) " +
            "values ($1, $2, $3, $4, $5, $6, now())";
    private final static String LOAD_EVENTS_QUERY = "select event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp" +
            " from events e where e.aggregate_id = $1 and e.version > $2 ORDER BY e.version ASC";
    private final static String HANDLE_CONCURRENCY_QUERY = "SELECT aggregate_id FROM events e WHERE e.aggregate_id = $1 LIMIT 1 FOR UPDATE";
    private final static String SAVE_SNAPSHOT_QUERY = "INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) " +
            "VALUES ($1, $2, $3, $4, $5, now()) " +
            "ON CONFLICT (aggregate_id) " +
            "DO UPDATE SET data = $3, version = $5, timestamp = now()";
    private final static String EXISTS_QUERY = "SELECT e.aggregate_id FROM events e WHERE e.aggregate_id = $1 LIMIT 1";
    private final static String GET_SNAPSHOT_QUERY = "select snapshot_id, aggregate_id, aggregate_type, data, metadata, version, timestamp from snapshots s where s.aggregate_id = $1";


    @Inject
    EventBus eventBus;

    @Inject
    PgPool pgPool;

    @Override
    @Traced
    public <T extends AggregateRoot> Uni<Void> save(T aggregate) {
        final List<Event> changes = new ArrayList<>(aggregate.getChanges());

        return pgPool.withTransaction(client -> handleConcurrency(client, aggregate.getId())
                .chain(v -> saveEvents(client, aggregate.getChanges()))
                .chain(s -> aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0 ? saveSnapshot(client, aggregate) : Uni.createFrom().item(s))
                .chain(a -> eventBus.publish(changes))
                .onFailure().invoke(ex -> logger.error("(save) eventBus.publish ex", ex))
                .onItem().invoke(success -> logger.infof("save aggregate success: %s", success)));
    }

    @Override
    @Traced
    public <T extends AggregateRoot> Uni<T> load(String aggregateId, Class<T> aggregateType) {
        return pgPool.withTransaction(client -> this.getSnapshot(client, aggregateId)
                        .onItem().transform(snapshot -> getSnapshotFromClass(snapshot, aggregateId, aggregateType)))
                .chain(a -> this.loadEvents(a.getId(), a.getVersion())
                        .chain(events -> raiseAggregateEvents(a, events)));
    }

    @Traced
    @Override
    public Uni<RowSet<Row>> saveEvents(SqlConnection client, List<Event> events) {
        final List<io.vertx.mutiny.sqlclient.Tuple> tupleList = events.stream().map(event -> Tuple.of(
                event.getAggregateId(),
                event.getAggregateType(),
                event.getEventType(),
                Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
                Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
                event.getVersion())).toList();

        if (tupleList.size() == 1) {
            return client.preparedQuery(SAVE_EVENTS_QUERY).execute(tupleList.get(0))
                    .onFailure().invoke(ex -> logger.error("(SAVE_EVENTS_QUERY) ex:", ex))
                    .onItem().invoke(result -> logger.infof("(saveEvents) execute result: %s", result.rowCount()));
        }

        return client.preparedQuery(SAVE_EVENTS_QUERY).executeBatch(tupleList)
                .onFailure().invoke(ex -> logger.error("(executeBatch) ex:", ex))
                .onItem().invoke(result -> logger.infof("(saveEvents) execute result: %s", result.rowCount()));
    }

    @Override
    @Traced
    public Uni<RowSet<Event>> loadEvents(String aggregateId, long version) {
        return pgPool.preparedQuery(LOAD_EVENTS_QUERY).mapping(EventStore::eventFromRow)
                .execute(Tuple.of(aggregateId, version))
                .onFailure().invoke(ex -> logger.error("(loadEvents) preparedQuery ex:", ex));
    }


    @Traced
    private Uni<RowSet<Row>> handleConcurrency(SqlConnection client, String aggregateID) {
        return client.preparedQuery(HANDLE_CONCURRENCY_QUERY).execute(Tuple.of(aggregateID))
                .onFailure().invoke(ex -> logger.error("handleConcurrency ex", ex));
    }


    @Traced
    private <T extends AggregateRoot> Uni<RowSet<Row>> saveSnapshot(SqlConnection client, T aggregate) {
        aggregate.toSnapshot();
        final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);

        return client.preparedQuery(SAVE_SNAPSHOT_QUERY).execute(Tuple.of(
                        snapshot.getAggregateId(),
                        snapshot.getAggregateType(),
                        Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
                        Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
                        snapshot.getVersion()))
                .onFailure().invoke(ex -> logger.error("(saveSnapshot) preparedQuery execute:", ex));
    }

    @Traced
    private Uni<Snapshot> getSnapshot(SqlConnection client, String aggregateID) {
        return client.preparedQuery(GET_SNAPSHOT_QUERY).mapping(EventStore::snapshotFromRow)
                .execute(Tuple.of(aggregateID))
                .onFailure().invoke(ex -> logger.error("(getSnapshot) preparedQuery ex:", ex))
                .onItem().transform(result -> result.size() == 0 ? null : result.iterator().next())
                .onItem().invoke(snapshot -> logger.infof("(getSnapshot) onSuccess snapshot version: %s", Optional.ofNullable(snapshot)
                        .map(Snapshot::getVersion)));
    }

    @Traced
    private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
        try {
            return aggregateType.getConstructor(String.class).newInstance(aggregateId);
        } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
                 NoSuchMethodException e) {
            throw new RuntimeException(e);
        }
    }

    @Traced
    private <T extends AggregateRoot> T getSnapshotFromClass(Snapshot snapshot, String aggregateId, Class<T> aggregateType) {
        if (snapshot == null) {
            final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
            return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
        }
        return EventSourcingUtils.aggregateFromSnapshot(snapshot, aggregateType);
    }


    @Traced
    private <T extends AggregateRoot> Uni<T> raiseAggregateEvents(T aggregate, RowSet<Event> events) {
        if (events != null && events.rowCount() > 0) {
            events.forEach(event -> {
                aggregate.raiseEvent(event);
                logger.infof("(load) loadEvents raiseEvent event version: %s", event.getVersion());
            });
            return Uni.createFrom().item(aggregate);
        } else {
            return (aggregate.getVersion() == 0) ? Uni.createFrom().failure(new BankAccountNotFoundException(aggregate.getId())) : Uni.createFrom().item(aggregate);
        }
    }

    @Override
    @Traced
    public Uni<Boolean> exists(String aggregateId) {
        return pgPool.preparedQuery(EXISTS_QUERY).execute(Tuple.of(aggregateId))
                .map(m -> m.rowCount() > 0)
                .onFailure().invoke(ex -> logger.error("(exists) aggregateId: %s, ex:", aggregateId, ex));
    }


    private static Snapshot snapshotFromRow(Row row) {
        return Snapshot.builder()
                .id(row.getUUID(SNAPSHOT_ID))
                .aggregateId(row.getString(AGGREGATE_ID))
                .aggregateType(row.getString(AGGREGATE_TYPE))
                .data(row.getBuffer(DATA).getBytes())
                .metaData(row.getBuffer(METADATA).getBytes())
                .version(row.getLong(VERSION))
                .timeStamp(row.getLocalDateTime(TIMESTAMP))
                .build();
    }

    private static Event eventFromRow(Row row) {
        return Event.builder()
                .id(row.getUUID(EVENT_ID))
                .aggregateId(row.getString(AGGREGATE_ID))
                .aggregateType(row.getString(AGGREGATE_TYPE))
                .eventType(row.getString(EVENT_TYPE))
                .data(row.getBuffer(DATA).getBytes())
                .metaData(row.getBuffer(METADATA).getBytes())
                .version(row.getLong(VERSION))
                .timeStamp(row.getOffsetDateTime(TIMESTAMP).toZonedDateTime())
                .build();
    }
}
Enter fullscreen mode Exit fullscreen mode

For the next step let's create a bank account aggregate:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class BankAccountAggregate extends AggregateRoot {


    public static final String AGGREGATE_TYPE = "BankAccountAggregate";

    public BankAccountAggregate(String id) {
        super(id, AGGREGATE_TYPE);
    }

    private String email;
    private String userName;
    private String address;
    private BigDecimal balance;


    @Override
    public void when(Event event) {
        switch (event.getEventType()) {
            case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
            case EmailChangedEvent.EMAIL_CHANGED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
            case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
            case BalanceDepositedEvent.BALANCE_DEPOSITED ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
            default -> throw new InvalidEventTypeException(event.getEventType());
        }
    }

    private void handle(final BankAccountCreatedEvent event) {
        this.email = event.getEmail();
        this.userName = event.getUserName();
        this.address = event.getAddress();
        this.balance = BigDecimal.valueOf(0);
    }

    private void handle(final EmailChangedEvent event) {
        Objects.requireNonNull(event.getNewEmail());
        if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
        this.email = event.getNewEmail();
    }

    private void handle(final AddressUpdatedEvent event) {
        Objects.requireNonNull(event.getNewAddress());
        if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
        this.address = event.getNewAddress();
    }

    private void handle(final BalanceDepositedEvent event) {
        Objects.requireNonNull(event.getAmount());
        this.balance = this.balance.add(event.getAmount());
    }

    public void createBankAccount(String email, String address, String userName) {
        final var data = BankAccountCreatedEvent.builder()
                .aggregateId(id)
                .email(email)
                .address(address)
                .userName(userName)
                .build();

        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1, dataBytes, null);
        this.apply(event);
    }

    public void changeEmail(String email) {
        final var data = EmailChangedEvent.builder().aggregateId(id).newEmail(email).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(EmailChangedEvent.EMAIL_CHANGED_V1, dataBytes, null);
        apply(event);

    }

    public void changeAddress(String newAddress) {
        final var data = AddressUpdatedEvent.builder().aggregateId(id).newAddress(newAddress).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(AddressUpdatedEvent.ADDRESS_UPDATED_V1, dataBytes, null);
        apply(event);
    }

    public void depositBalance(BigDecimal amount) {
        final var data = BalanceDepositedEvent.builder().aggregateId(id).amount(amount).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(BalanceDepositedEvent.BALANCE_DEPOSITED, dataBytes, null);
        apply(event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Swagger

Our microservice accepts HTTP requests:
For swagger used Swagger OpenAPI 3.
The bank account REST controller, which accept requests, validate it using Hibernate Validator,
then call a command or query service.
The main reason for CQRS gaining popularity is the ability to handle reads
and writes separately due to severe differences in optimization techniques for those much more distinct operations.

@Path(value = "/api/v1/bank")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class BankAccountResource {
    private final static Logger logger = Logger.getLogger(BankAccountResource.class);

    @Inject
    BankAccountCommandService commandService;

    @Inject
    BankAccountQueryService queryService;


    @POST
    @Traced
    @Retry(maxRetries = 3, delay = 300)
    @Timeout(value = 5000)
    @CircuitBreaker(requestVolumeThreshold = 30, delay = 3000, failureRatio = 0.6)
    public Uni<Response> createBanAccount(@Valid CreateBankAccountRequestDTO dto) {
        final var command = new CreateBankAccountCommand(dto.email(), dto.userName(), dto.address());
        logger.infof("CreateBankAccountCommand: %s", command);
        return commandService.handle(command).onItem().transform(id -> Response.status(Response.Status.CREATED).entity(id).build());
    }

    @POST
    @Path("/email/{aggregateID}")
    @Traced
    @Retry(maxRetries = 3, delay = 300)
    @Timeout(value = 5000)
    @CircuitBreaker(requestVolumeThreshold = 30, delay = 3000, failureRatio = 0.6)
    public Uni<Response> updateEmail(@PathParam("aggregateID") String aggregateID, @Valid ChangeEmailRequestDTO dto) {
        final var command = new ChangeEmailCommand(aggregateID, dto.newEmail());
        logger.infof("ChangeEmailCommand: %s", command);
        return commandService.handle(command).onItem().transform(id -> Response.status(Response.Status.NO_CONTENT).build());
    }

    @POST
    @Path("/address/{aggregateID}")
    @Traced
    @Retry(maxRetries = 3, delay = 300)
    @Timeout(value = 5000)
    @CircuitBreaker(requestVolumeThreshold = 30, delay = 3000, failureRatio = 0.6)
    public Uni<Response> changeAddress(@PathParam("aggregateID") String aggregateID, @Valid ChangeAddressRequestDTO dto) {
        final var command = new ChangeAddressCommand(aggregateID, dto.newAddress());
        logger.infof("ChangeAddressCommand: %s", command);
        return commandService.handle(command).onItem().transform(id -> Response.status(Response.Status.NO_CONTENT).build());
    }

    @POST
    @Path("/deposit/{aggregateID}")
    @Traced
    @Retry(maxRetries = 3, delay = 300)
    @Timeout(value = 5000)
    @CircuitBreaker(requestVolumeThreshold = 30, delay = 3000, failureRatio = 0.6)
    public Uni<Response> depositAmount(@PathParam("aggregateID") String aggregateID, @Valid DepositAmountRequestDTO dto) {
        final var command = new DepositAmountCommand(aggregateID, dto.amount());
        logger.infof("DepositAmountCommand: %s", command);
        return commandService.handle(command).onItem().transform(id -> Response.status(Response.Status.NO_CONTENT).build());
    }

    @GET
    @Path("{aggregateID}")
    @Traced
    @Retry(maxRetries = 3, delay = 300)
    @Timeout(value = 3000)
    @CircuitBreaker(requestVolumeThreshold = 30, delay = 3000, failureRatio = 0.6)
    public Uni<Response> getBanAccount(@PathParam("aggregateID") String aggregateID) {
        final var query = new GetBankAccountByIDQuery(aggregateID);
        logger.infof("(HTTP getBanAccount) GetBankAccountByIDQuery: %s", query);
        return queryService.handle(query).onItem().transform(aggregate -> Response.status(Response.Status.OK).entity(aggregate).build());
    }

    @GET
    @Path("/balance")
    @Traced
    @Retry(maxRetries = 3, delay = 300)
    @Timeout(value = 3000)
    @CircuitBreaker(requestVolumeThreshold = 30, delay = 3000, failureRatio = 0.6)
    public Uni<Response> getAllByBalance(@QueryParam("page") Optional<Integer> page, @QueryParam("size") Optional<Integer> size) {
        final var query = new FindAllByBalanceQuery(Page.of(page.orElse(0), size.orElse(5)));
        return queryService.handle(query).onItem().transform(result -> Response.status(Response.Status.OK).entity(result).build());
    }
}
Enter fullscreen mode Exit fullscreen mode

The main attribute of a command is that when the command gets successfully executed, the system transitions to a new state.
Command handlers are responsible for handling commands, mutating state or doing other side effects.
The command service handle cqrs commands, load aggregate from event store and call its methods depend on business logic of the application,
aggregate applies these changes, and then we save these events changes list in event store.
For microservice resilience used here SmallRye Fault Tolerance.

Jaeger

@ApplicationScoped
public class BankAccountCommandHandler implements BankAccountCommandService {

    private final static Logger logger = Logger.getLogger(BankAccountCommandHandler.class);

    @Inject
    EventStoreDB eventStoreDB;


    @Override
    @Traced
    public Uni<String> handle(CreateBankAccountCommand command) {
        final var aggregate = new BankAccountAggregate(command.aggregateID());
        aggregate.createBankAccount(command.email(), command.address(), command.userName());
        return eventStoreDB.save(aggregate).replaceWith(aggregate.getId())
                .onItem().invoke(() -> logger.infof("crated bank account: %s", aggregate));
    }

    @Override
    @Traced
    public Uni<Void> handle(ChangeEmailCommand command) {
        return eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class)
                .onItem().transform(aggregate -> {
                    aggregate.changeEmail(command.newEmail());
                    return aggregate;
                })
                .chain(aggregate -> eventStoreDB.save(aggregate))
                .onItem().invoke(() -> logger.infof("changed email: %s, id: %s", command.newEmail(), command.aggregateID()));
    }

    @Override
    @Traced
    public Uni<Void> handle(ChangeAddressCommand command) {
        return eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class)
                .onItem().transform(aggregate -> {
                    aggregate.changeAddress(command.newAddress());
                    return aggregate;
                })
                .chain(aggregate -> eventStoreDB.save(aggregate))
                .onItem().invoke(() -> logger.infof("changed address: %s, id: %s", command.newAddress(), command.aggregateID()));
    }

    @Override
    @Traced
    public Uni<Void> handle(DepositAmountCommand command) {
        return eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class)
                .onItem().transform(aggregate -> {
                    aggregate.depositBalance(command.amount());
                    return aggregate;
                })
                .chain(aggregate -> eventStoreDB.save(aggregate))
                .onItem().invoke(() -> logger.infof("deposited amount: %s, id: %s", command.amount(), command.aggregateID()));
    }
}
Enter fullscreen mode Exit fullscreen mode

Jaeger

In Event Sourcing, Projections (also known as View Models or Query Models) provide a view of the underlying event-based data model.
Often they represent the logic of translating the source write model into the read model.
The idea is that the projection will receive all the events that it is able to project and will do the normal CRUD operations on the read model it controls,
using the normal CRUD operations provided by the read model database.
Projections aren’t limited to only processing events of a single entity and can assemble and aggregate data for multiple entities, even for different types of entities.
Events appended in the event store trigger the projection logic that creates or updates the read model.
We can subscribe to our projections for order-type stream events.
When we execute a command, the aggregate generates a new event that represents the state transitions of the aggregate.
Those events are committed to the store, so the store appends them to the end of the aggregate stream.
A projection receives these events and updates its read models, using When method, like aggregate it applies changes depending on the event type:

@ApplicationScoped
public class BankAccountMongoProjection implements Projection {

    private final static Logger logger = Logger.getLogger(BankAccountMongoProjection.class);

    @Inject
    BankAccountMongoPanacheRepository panacheRepository;

    @Inject
    EventStoreDB eventStore;

    @Incoming(value = "eventstore-in")
    @Traced
    public Uni<Void> process(Message<byte[]> message) {
        logger.infof("(consumer) process >>> events: %s", new String(message.getPayload()));
        final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(message.getPayload());

        if (events.length == 0)
            return Uni.createFrom().voidItem()
                    .onItem().invoke(() -> logger.warn("empty events list"))
                    .onItem().invoke(message::ack)
                    .onFailure().invoke(ex -> logger.error("process msg ack exception", ex));

        return Multi.createFrom().iterable(List.of(events))
                .onItem().call(event -> this.when(event)
                        .onFailure().call(() -> panacheRepository.deleteByAggregateId(events[0].getAggregateId())
                                .onFailure().invoke(ex -> logger.error("panacheRepository.deleteByAggregateId id: %s", events[0].getAggregateId(), ex))
                                .onItem().call(e -> eventStore.load(events[0].getAggregateId(), BankAccountAggregate.class)
                                        .onFailure().invoke(ex -> logger.error("eventStore.load", ex))
                                        .onItem().call(bankAccountAggregate -> panacheRepository.persist(BankAccountMapper.bankAccountDocumentFromAggregate(bankAccountAggregate))))
                                .onFailure().invoke(ex -> logger.error("panacheRepository.persist bankAccountAggregate", ex))))
                .toUni().replaceWithVoid()
                .onItem().invoke(v -> message.ack())
                .onFailure().invoke(ex -> logger.error("consumer process events aggregateId: %s", events[0].getAggregateId(), ex));
    }

    @Traced
    public Uni<Void> when(Event event) {
        final var aggregateId = event.getAggregateId();
        logger.infof("(when) >>>>> aggregateId: %s", aggregateId);

        switch (event.getEventType()) {
            case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 -> {
                return handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
            }
            case EmailChangedEvent.EMAIL_CHANGED_V1 -> {
                return handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
            }
            case AddressUpdatedEvent.ADDRESS_UPDATED_V1 -> {
                return handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
            }
            case BalanceDepositedEvent.BALANCE_DEPOSITED -> {
                return handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
            }
            default -> {
                return Uni.createFrom().failure(new InvalidEventTypeException(event.getEventType()));
            }
        }
    }

    @Traced
    @Retry(maxRetries = 3, delay = 500)
    @Timeout(value = 5000)
    private Uni<Void> handle(BankAccountCreatedEvent event) {
        logger.infof("(when) BankAccountCreatedEvent: %s, aggregateID: %s", event, event.getAggregateId());

        final var document = BankAccountDocument.builder()
                .aggregateId(event.getAggregateId())
                .email(event.getEmail())
                .address(event.getAddress())
                .userName(event.getUserName())
                .balance(BigDecimal.valueOf(0))
                .build();

        return panacheRepository.persist(document)
                .onItem().invoke(result -> logger.infof("persist document result: %s", result))
                .onFailure().invoke(ex -> logger.error("handle BankAccountCreatedEvent persist aggregateID: %s", event.getAggregateId(), ex))
                .replaceWithVoid();
    }

    @Traced
    @Retry(maxRetries = 3, delay = 500)
    @Timeout(value = 5000)
    private Uni<Void> handle(EmailChangedEvent event) {
        logger.infof("(when) EmailChangedEvent: %s, aggregateID: %s", event, event.getAggregateId());

        return panacheRepository.findByAggregateId(event.getAggregateId())
                .onFailure().invoke(ex -> logger.error("handle EmailChangedEvent findByAggregateId aggregateID: %s", event.getAggregateId(), ex))
                .chain(bankAccountDocument -> {
                    bankAccountDocument.setEmail(event.getNewEmail());
                    return panacheRepository.update(bankAccountDocument);
                })
                .onFailure().invoke(ex -> logger.error("handle EmailChangedEvent update aggregateID: %s", event.getAggregateId(), ex))
                .onItem().invoke(updatedDocument -> logger.infof("(EmailChangedEvent) updatedDocument: %s", updatedDocument))
                .replaceWithVoid();
    }

    @Traced
    @Retry(maxRetries = 3, delay = 500)
    @Timeout(value = 5000)
    private Uni<Void> handle(AddressUpdatedEvent event) {
        logger.infof("(when) AddressUpdatedEvent: %s, aggregateID: %s", event, event.getAggregateId());

        return panacheRepository.findByAggregateId(event.getAggregateId())
                .onFailure().invoke(ex -> logger.error("handle EmailChangedEvent findByAggregateId aggregateID: %s", event.getAggregateId(), ex))
                .chain(bankAccountDocument -> {
                    bankAccountDocument.setAddress(event.getNewAddress());
                    return panacheRepository.update(bankAccountDocument);
                })
                .onFailure().invoke(ex -> logger.error("handle AddressUpdatedEvent update aggregateID: %s", event.getAggregateId(), ex))
                .onItem().invoke(updatedDocument -> logger.infof("(AddressUpdatedEvent) updatedDocument: %s", updatedDocument))
                .replaceWithVoid();
    }

    @Traced
    @Retry(maxRetries = 3, delay = 500)
    @Timeout(value = 5000)
    private Uni<Void> handle(BalanceDepositedEvent event) {
        logger.infof("(when) BalanceDepositedEvent: %s, aggregateID: %s", event, event.getAggregateId());

        return panacheRepository.findByAggregateId(event.getAggregateId())
                .onFailure().invoke(ex -> logger.error("handle EmailChangedEvent findByAggregateId aggregateID: %s", event.getAggregateId(), ex))
                .chain(bankAccountDocument -> {
                    final var balance = bankAccountDocument.getBalance();
                    bankAccountDocument.setBalance(balance.add(event.getAmount()));
                    return panacheRepository.update(bankAccountDocument);
                })
                .onFailure().invoke(ex -> logger.error("handle BalanceDepositedEvent update aggregateID: %s", event.getAggregateId(), ex))
                .onItem().invoke(updatedDocument -> logger.infof("(BalanceDepositedEvent) updatedDocument: %s", updatedDocument))
                .replaceWithVoid();
    }
}
Enter fullscreen mode Exit fullscreen mode

Swagger

Queries in CQRS represent the intention to get data and are responsible for returning the result of the requested query.
The read model can be but doesn’t have to be, derived from the write model.
It’s a transformation of the results of the business operation into a readable form.
One of the great outcomes of having an event-sourced system is the ability to create new read models at will, at any time, without affecting anything else.
Then we can retrieve projection data using query service:

@ApplicationScoped
public class BankAccountQueryHandler implements BankAccountQueryService {

    private final static Logger logger = Logger.getLogger(BankAccountQueryHandler.class);

    @Inject
    EventStoreDB eventStoreDB;

    @Inject
    BankAccountMongoPanacheRepository panacheRepository;

    @Override
    @Traced
    public Uni<BankAccountResponseDTO> handle(GetBankAccountByIDQuery query) {
        return panacheRepository.findByAggregateId(query.aggregateID())
                .onItem().transform(BankAccountMapper::bankAccountResponseDTOFromDocument)
                .onItem().invoke(bankAccountResponseDTO -> logger.infof("(FIND panacheRepository.findByAggregateId) bankAccountResponseDTO: %s", bankAccountResponseDTO))
                .onFailure().invoke(ex -> logger.errorf("mongo aggregate not found: %s", ex.getMessage()))
                .onFailure().recoverWithUni(e -> eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class)
                        .onFailure().invoke(ex -> logger.error("eventStoreDB.load", ex))
                        .onItem().invoke(bankAccountAggregate -> logger.infof("(eventStoreDB.load) >>> bankAccountAggregate: %s", bankAccountAggregate))
                        .onItem().call(bankAccountAggregate -> panacheRepository.persist(BankAccountMapper.bankAccountDocumentFromAggregate(bankAccountAggregate))
                                .onItem().invoke(bankAccountDocument -> logger.infof("(panacheRepository.persist) >>> bankAccountDocument: %s", bankAccountDocument)))
                        .onFailure().invoke(ex -> logger.error("persist", ex))
                        .onItem().transform(BankAccountMapper::bankAccountResponseDTOFromAggregate)
                        .onItem().invoke(bankAccountResponseDTO -> logger.infof("(bankAccountResponseDTO) >>> bankAccountResponseDTO: %s", bankAccountResponseDTO)));
    }

    @Override
    @Traced
    public Uni<List<BankAccountDocument>> handle(FindAllByBalanceQuery query) {
        return panacheRepository.findAllSortByBalanceWithPagination(query.page())
                .onItem().invoke(result -> logger.infof("(findAllSortByBalanceWithPagination) query: %s", query));
    }
}
Enter fullscreen mode Exit fullscreen mode

Jaeger

Quarkus Mongo panache repository:

@ApplicationScoped
@Traced
public class BankAccountMongoPanacheRepository implements ReactivePanacheMongoRepository<BankAccountDocument> {

    public Uni<BankAccountDocument> findByAggregateId(String aggregateId) {
        return find("aggregateId", aggregateId).firstResult();
    }

    public Uni<Long> deleteByAggregateId(String aggregateId) {
        return delete("aggregateId", aggregateId);
    }

    public Uni<List<BankAccountDocument>> findAllSortByBalanceWithPagination(Page page) {
        return findAll(Sort.ascending("balance")).page(page).list();
    }
}
Enter fullscreen mode Exit fullscreen mode

More details and source code of the full project you can find here,
of course in real-world applications, we have to implement many more necessary features, like k8s health checks, rate limiters, etc.,
depending on the project it can be implemented in different ways, for example, you can use Kubernetes and Istio for some of them.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Latest comments (0)