DEV Community

Cover image for Java Spring EventSourcing and CQRS Clean Architecture microservice πŸ‘‹βš‘οΈπŸ’«
Alexander
Alexander

Posted on • Updated on

Java Spring EventSourcing and CQRS Clean Architecture microservice πŸ‘‹βš‘οΈπŸ’«

In this article let's try to create closer to real world Event Sourcing and CQRS microservice using: πŸš€πŸ‘¨β€πŸ’»πŸ™Œ

πŸ‘¨β€πŸ’» Full list what has been used:

Source code you can find in GitHub repository

The main idea of this project is the implementation of Event Sourcing and CQRS using Java, Spring and EventStore using Postgresql,
previously have written same article where implemented the same using Go and EventStoreDB,
think EventStoreDB is the best choice for event sourcing, but in real life at some projects we usually have business restrictions and for example
usage of the EventStoreDB can be not allowed, in this case think postgres and kafka is good alternative for implementing our own event store.
Didn't write in this article about Event Sourcing and CQRS patterns,th e best place to read is microservices.io,
blog and documentation of this article is very good too,
and as written in previous article, highly recommend Alexey Zimarev "Hands-on Domain-Driven Design with .NET Core" book and also his blog.

In this project we have microservice events tore implemented using PostgreSQL,Spring Data JPA
and Kafka, for projections
used Spring Data MongoDB and communicate by REST.

Did not implement here any interesting business logic and didn't cover tests, because don't have enough time,
the events list simplified: create a new bank account, change email or address and deposit money,
of course in real-world better use more concrete and meaningfully events, but the target here is to show the idea and how it works.

All UI interfaces will be available on ports:

Swagger UI: http://localhost:8006/swagger-ui/index.html

Swagger

Jaeger UI: http://localhost:16686

Jaeger

Prometheus UI: http://localhost:9090

Prometheus

Grafana UI: http://localhost:3005

Grafana

Docker compose file for this project:

version: "3.9"

services:
  postgresql:
    image: postgres:14.2
    container_name: postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=microservices
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: 'bitnami/kafka:3.0.1'
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "./kafka_data:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  mongodb:
    image: docker.io/bitnami/mongodb:4.4
    restart: always
    container_name: microservices_mongo
    environment:
      MONGODB_ROOT_USER: admin
      MONGODB_ROOT_PASSWORD: admin
      BITNAMI_DEBUG: "false"
      ALLOW_EMPTY_PASSWORD: "no"
      MONGODB_SYSTEM_LOG_VERBOSITY: "0"
      MONGODB_DISABLE_SYSTEM_LOG: "no"
      MONGODB_DISABLE_JAVASCRIPT: "no"
      MONGODB_ENABLE_JOURNAL: "yes"
      MONGODB_ENABLE_IPV6: "no"
      MONGODB_ENABLE_DIRECTORY_PER_DB: "no"
      MONGODB_DATABASE: "microservices"
    volumes:
      - ./mongodb_data_container:/data/db
    ports:
      - "27017:27017"
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

AggregateRoot can be implemented in different ways, the main methods is load events - apply and raise changes.
When we fetch the aggregate from the database, instead of reading its state as one record in a table or document,
we read all events that were saved before and call when method for each.
After all these steps, we will recover all the history of a given aggregate.
By doing this, we will be bringing our aggregate to its latest state.

@Data
@NoArgsConstructor
public abstract class AggregateRoot {

    protected String id;
    protected String type;
    protected long version;
    protected final List<Event> changes = new ArrayList<>();

    public AggregateRoot(final String id, final String aggregateType) {
        this.id = id;
        this.type = aggregateType;
    }


    public abstract void when(final Event event);

    public void load(final List<Event> events) {
        events.forEach(event -> {
            this.validateEvent(event);
            this.raiseEvent(event);
            this.version++;
        });
    }

    public void apply(final Event event) {
        this.validateEvent(event);
        event.setAggregateType(this.type);

        when(event);
        changes.add(event);

        this.version++;
        event.setVersion(this.version);
    }

    public void raiseEvent(final Event event) {
        this.validateEvent(event);

        event.setAggregateType(this.type);
        when(event);

        this.version++;
    }

    public void clearChanges() {
        this.changes.clear();
    }

    private void validateEvent(final Event event) {
        if (Objects.isNull(event) || !event.getAggregateId().equals(this.id))
            throw new InvalidEventException(event.toString());
    }
}
Enter fullscreen mode Exit fullscreen mode

An event represents a fact that took place in the domain. They are the source of truth; your current state is derived from the events.
They are immutable and represent the business facts.
It means that we never change or remove anything in the database, we only append new events.

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Event {
    private UUID id;
    private String aggregateId;
    private String eventType;
    private String aggregateType;
    private long version;
    private byte[] data;
    private byte[] metaData;
    private LocalDateTime timeStamp;


    public Event(String eventType, String aggregateType) {
        this.id = UUID.randomUUID();
        this.eventType = eventType;
        this.aggregateType = aggregateType;
        this.timeStamp = LocalDateTime.now();
    }
}
Enter fullscreen mode Exit fullscreen mode

If we follow the Event Sourcing pattern literally, we needs to get all these transactions to calculate the current account's balance.
This won't be efficient. Your first thought to make this more efficient may be caching the latest state somewhere.
Instead of retrieving all these events, we could retrieve one record and use it for our business logic. This is a Snapshot.
In this microservice we make snapshot from aggregate and save it in postgres every aggregate version % N times,
when we load aggregate from event store, first we load snapshot and then events which have version greater than snapshot.

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Snapshot {

    private UUID id;
    private String aggregateId;
    private String aggregateType;
    private byte[] data;
    private byte[] metaData;
    private long version;
    private LocalDateTime timeStamp;
}
Enter fullscreen mode Exit fullscreen mode

Implementation of AggregateStore is Load, Save and Exists methods,
Load and Save accept aggregate then load or apply events using EventStoreDB client.
The Load method: find out the stream name for an aggregate, read all the events from the aggregate stream,
loop through all the events, and call the RaiseEvent handler for each of them.
And the Save method persists aggregates by saving the history of changes, handling concurrency,
when you retrieve a stream from EventStoreDB, you take note of the current version number,
then when you save it back you can determine if somebody else has modified the record in the meantime.
The postgres event store implementation:

@Repository
@RequiredArgsConstructor
@Slf4j
public class EventStore implements EventStoreDB {

    public static final int SNAPSHOT_FREQUENCY = 3;
    private static final String SAVE_EVENTS_QUERY = "INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())";
    private static final String LOAD_EVENTS_QUERY = "SELECT event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp FROM events e WHERE e.aggregate_id = :aggregate_id AND e.version > :version ORDER BY e.version ASC";
    private static final String SAVE_SNAPSHOT_QUERY = "INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) ON CONFLICT (aggregate_id) DO UPDATE SET data = :data, version = :version, timestamp = now()";
    private static final String HANDLE_CONCURRENCY_QUERY = "SELECT aggregate_id FROM events e WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE";
    private static final String LOAD_SNAPSHOT_QUERY = "SELECT aggregate_id, aggregate_type, data, metadata, version, timestamp FROM snapshots s WHERE s.aggregate_id = :aggregate_id";
    private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id";

    private final NamedParameterJdbcTemplate jdbcTemplate;
    private final EventBus eventBus;


    @Override
    @Transactional
    @NewSpan
    public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
        final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());

        if (aggregate.getVersion() > 1) {
            this.handleConcurrency(aggregate.getId());
        }

        this.saveEvents(aggregate.getChanges());
        if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
            this.saveSnapshot(aggregate);
        }

        eventBus.publish(aggregateEvents);

        log.info("(save) saved aggregate: {}", aggregate);
    }

    @Override
    @Transactional(readOnly = true)
    @NewSpan
    public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {

        final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);

        final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);

        final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
        events.forEach(event -> {
            aggregate.raiseEvent(event);
            log.info("raise event version: {}", event.getVersion());
        });

        if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);

        log.info("(load) loaded aggregate: {}", aggregate);
        return aggregate;
    }

    @Override
    @NewSpan
    public void saveEvents(@SpanTag("events") List<Event> events) {
        if (events.isEmpty()) return;

        final List<Event> changes = new ArrayList<>(events);
        if (changes.size() > 1) {
            this.eventsBatchInsert(changes);
            return;
        }

        final Event event = changes.get(0);
        int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, mapFromEvent(event));
        log.info("(saveEvents) saved result: {}, event: {}", result, event);
    }

    private Map<String, Serializable> mapFromEvent(Event event) {
        return Map.of(
                AGGREGATE_ID, event.getAggregateId(),
                AGGREGATE_TYPE, event.getAggregateType(),
                EVENT_TYPE, event.getEventType(),
                DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
                METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
                VERSION, event.getVersion());
    }


    @NewSpan
    private void eventsBatchInsert(@SpanTag("events") List<Event> events) {
        final var args = events.stream().map(this::mapFromEvent).toList();
        final Map<String, ?>[] maps = args.toArray(new Map[0]);
        int[] ints = jdbcTemplate.batchUpdate(SAVE_EVENTS_QUERY, maps);
        log.info("(saveEvents) BATCH saved result: {}, event: {}", ints);
    }

    @Override
    @NewSpan
    public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
        return jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of(AGGREGATE_ID, aggregateId, VERSION, version),
                (rs, rowNum) -> Event.builder()
                        .aggregateId(rs.getString(AGGREGATE_ID))
                        .aggregateType(rs.getString(AGGREGATE_TYPE))
                        .eventType(rs.getString(EVENT_TYPE))
                        .data(rs.getBytes(DATA))
                        .metaData(rs.getBytes(METADATA))
                        .version(rs.getLong(VERSION))
                        .timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
                        .build());
    }

    @NewSpan
    private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
        aggregate.toSnapshot();
        final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);

        int updateResult = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
                Map.of(AGGREGATE_ID, snapshot.getAggregateId(),
                        AGGREGATE_TYPE, snapshot.getAggregateType(),
                        DATA, Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
                        METADATA, Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
                        VERSION, snapshot.getVersion()));

        log.info("(saveSnapshot) updateResult: {}", updateResult);
    }


    @NewSpan
    private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
        try {
            String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
            log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
        } catch (EmptyResultDataAccessException e) {
            log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
        }
        log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
    }

    @NewSpan
    private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
        return jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of(AGGREGATE_ID, aggregateId), (rs, rowNum) -> Snapshot.builder()
                .aggregateId(rs.getString(AGGREGATE_ID))
                .aggregateType(rs.getString(AGGREGATE_TYPE))
                .data(rs.getBytes(DATA))
                .metaData(rs.getBytes(METADATA))
                .version(rs.getLong(VERSION))
                .timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
                .build()).stream().findFirst();
    }

    @NewSpan
    private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
        try {
            return aggregateType.getConstructor(String.class).newInstance(aggregateId);
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @NewSpan
    private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
        if (snapshot.isEmpty()) {
            final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
            return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
        }
        return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
    }


    @Override
    @NewSpan
    public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
        try {
            final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
            log.info("aggregate exists id: {}", id);
            return true;
        } catch (Exception ex) {
            if (!(ex instanceof EmptyResultDataAccessException)) {
                throw new RuntimeException(ex);
            }
            return false;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

For the next step let's create a bank account aggregate:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class BankAccountAggregate extends AggregateRoot {


    public static final String AGGREGATE_TYPE = "BankAccountAggregate";

    public BankAccountAggregate(String id) {
        super(id, AGGREGATE_TYPE);
    }

    private String email;
    private String userName;
    private String address;
    private BigDecimal balance;


    @Override
    public void when(Event event) {
        switch (event.getEventType()) {
            case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
            case EmailChangedEvent.EMAIL_CHANGED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
            case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
            case BalanceDepositedEvent.BALANCE_DEPOSITED ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
            default -> throw new InvalidEventTypeException(event.getEventType());
        }
    }

    private void handle(final BankAccountCreatedEvent event) {
        this.email = event.getEmail();
        this.userName = event.getUserName();
        this.address = event.getAddress();
        this.balance = BigDecimal.valueOf(0);
    }

    private void handle(final EmailChangedEvent event) {
        Objects.requireNonNull(event.getNewEmail());
        if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
        this.email = event.getNewEmail();
    }

    private void handle(final AddressUpdatedEvent event) {
        Objects.requireNonNull(event.getNewAddress());
        if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
        this.address = event.getNewAddress();
    }

    private void handle(final BalanceDepositedEvent event) {
        Objects.requireNonNull(event.getAmount());
        this.balance = this.balance.add(event.getAmount());
    }

    public void createBankAccount(String email, String address, String userName) {
        final var data = BankAccountCreatedEvent.builder()
                .aggregateId(id)
                .email(email)
                .address(address)
                .userName(userName)
                .build();

        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1, dataBytes, null);
        this.apply(event);
    }

    public void changeEmail(String email) {
        final var data = EmailChangedEvent.builder().aggregateId(id).newEmail(email).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(EmailChangedEvent.EMAIL_CHANGED_V1, dataBytes, null);
        apply(event);

    }

    public void changeAddress(String newAddress) {
        final var data = AddressUpdatedEvent.builder().aggregateId(id).newAddress(newAddress).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(AddressUpdatedEvent.ADDRESS_UPDATED_V1, dataBytes, null);
        apply(event);
    }

    public void depositBalance(BigDecimal amount) {
        final var data = BalanceDepositedEvent.builder().aggregateId(id).amount(amount).build();
        final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
        final var event = this.createEvent(BalanceDepositedEvent.BALANCE_DEPOSITED, dataBytes, null);
        apply(event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Our microservice accept http requests:
swagger
For swagger used Swagger OpenAPI 3.
The bank account REST controller, which accept requests, validate it using Hibernate Validator,
then call command or query service.
The main reason for CQRS gaining popularity is the ability to handle reads and writes separately due to severe differences in optimization techniques
for those much more distinct operations.

@RestController
@RequestMapping(path = "/api/v1/bank")
@Slf4j
@RequiredArgsConstructor
public class BankAccountController {

    private final BankAccountCommandService commandService;
    private final BankAccountQueryService queryService;

    @GetMapping("{aggregateId}")
    public ResponseEntity<BankAccountResponseDTO> getBankAccount(@PathVariable String aggregateId) {
        final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId));
        log.info("GET bank account result: {}", result);
        return ResponseEntity.ok(result);
    }

    @PostMapping
    public ResponseEntity<String> createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) {
        final var aggregateID = UUID.randomUUID().toString();
        final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()));
        log.info("CREATE bank account id: {}", id);
        return ResponseEntity.status(HttpStatus.CREATED).body(id);
    }

    @PostMapping(path = "/deposit/{aggregateId}")
    public ResponseEntity<Void> depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) {
        commandService.handle(new DepositAmountCommand(aggregateId, dto.amount()));
        return ResponseEntity.ok().build();
    }

    @PostMapping(path = "/email/{aggregateId}")
    public ResponseEntity<Void> changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) {
        commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail()));
        return ResponseEntity.ok().build();
    }

    @PostMapping(path = "/address/{aggregateId}")
    public ResponseEntity<Void> changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) {
        commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress()));
        return ResponseEntity.ok().build();
    }

    @GetMapping("/balance")
    public ResponseEntity<Page<BankAccountResponseDTO>> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page,
                                                                             @RequestParam(name = "size", defaultValue = "10") Integer size) {

        final var result = queryService.handle(new FindAllOrderByBalance(page, size));
        log.info("GET all by balance result: {}", result);
        return ResponseEntity.ok(result);
    }
}
Enter fullscreen mode Exit fullscreen mode

The command service handle cqrs commands, load aggregate from event store and call its methods depend on business logic of the application,
aggregate applies these changes and then we save this events changes list in event store.
For microservice resilience used here Resilience4j,
it can be configured in properties file, and it's the nice feature because of in the real world very common case when you need to apply some changes in k8s yaml.

@RequiredArgsConstructor
@Slf4j
@Service
public class BankAccountCommandHandler implements BankAccountCommandService {

    private final EventStoreDB eventStoreDB;
    private static final String SERVICE_NAME = "microservice";

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public String handle(@SpanTag("command") CreateBankAccountCommand command) {
        final var aggregate = new BankAccountAggregate(command.aggregateID());
        aggregate.createBankAccount(command.email(), command.address(), command.userName());
        eventStoreDB.save(aggregate);

        log.info("(CreateBankAccountCommand) aggregate: {}", aggregate);
        return aggregate.getId();
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void handle(@SpanTag("command") ChangeEmailCommand command) {
        final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
        aggregate.changeEmail(command.newEmail());
        eventStoreDB.save(aggregate);
        log.info("(ChangeEmailCommand) aggregate: {}", aggregate);
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void handle(@SpanTag("command") ChangeAddressCommand command) {
        final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
        aggregate.changeAddress(command.newAddress());
        eventStoreDB.save(aggregate);
        log.info("(ChangeAddressCommand) aggregate: {}", aggregate);
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void handle(@SpanTag("command") DepositAmountCommand command) {
        final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
        aggregate.depositBalance(command.amount());
        eventStoreDB.save(aggregate);
        log.info("(DepositAmountCommand) aggregate: {}", aggregate);
    }
}
Enter fullscreen mode Exit fullscreen mode

Jaeger
The process of building a piece of state from events is called a projection.
We can subscribe our projections for order type stream events.
When we execute a command, the aggregate generates a new event that represents the state transitions of the aggregate.
Those events are committed to the store, so the store appends them to the end of the aggregate stream.
A projection receives these events and updates its read models, using When method, like aggregate it applies changes depending on the event type:

@Service
@Slf4j
@RequiredArgsConstructor
public class BankAccountMongoProjection implements Projection {

    private final BankAccountMongoRepository mongoRepository;
    private final EventStoreDB eventStoreDB;
    private static final String SERVICE_NAME = "microservice";


    @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
            groupId = "${microservice.kafka.groupId}",
            concurrency = "${microservice.kafka.default-concurrency}")
    public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
        log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data));

        try {
            final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
            this.processEvents(Arrays.stream(events).toList());
            ack.acknowledge();
            log.info("ack events: {}", Arrays.toString(events));
        } catch (Exception ex) {
            ack.nack(100);
            log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex);
        }
    }

    @NewSpan
    private void processEvents(@SpanTag("events") List<Event> events) {
        if (events.isEmpty()) return;

        try {
            events.forEach(this::when);
        } catch (Exception ex) {
            mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
            final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
            final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
            final var result = mongoRepository.save(document);
            log.info("(processEvents) saved document: {}", result);
        }
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public void when(@SpanTag("event") Event event) {
        final var aggregateId = event.getAggregateId();
        log.info("(when) >>>>> aggregateId: {}", aggregateId);

        switch (event.getEventType()) {
            case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
            case EmailChangedEvent.EMAIL_CHANGED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
            case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
            case BalanceDepositedEvent.BALANCE_DEPOSITED ->
                    handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
            default -> log.error("unknown event type: {}", event.getEventType());
        }
    }


    @NewSpan
    private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
        log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());

        final var document = BankAccountDocument.builder()
                .aggregateId(event.getAggregateId())
                .email(event.getEmail())
                .address(event.getAddress())
                .userName(event.getUserName())
                .balance(BigDecimal.valueOf(0))
                .build();

        final var insert = mongoRepository.insert(document);
        log.info("(BankAccountCreatedEvent) insert: {}", insert);
    }

    @NewSpan
    private void handle(@SpanTag("event") EmailChangedEvent event) {
        log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
        final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
        if (documentOptional.isEmpty())
            throw new BankAccountDocumentNotFoundException(event.getAggregateId());

        final var document = documentOptional.get();
        document.setEmail(event.getNewEmail());
        mongoRepository.save(document);
    }

    @NewSpan
    private void handle(@SpanTag("event") AddressUpdatedEvent event) {
        log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
        final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
        if (documentOptional.isEmpty())
            throw new BankAccountDocumentNotFoundException(event.getAggregateId());

        final var document = documentOptional.get();
        document.setAddress(event.getNewAddress());
        mongoRepository.save(document);
    }

    @NewSpan
    private void handle(@SpanTag("event") BalanceDepositedEvent event) {
        log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
        final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
        if (documentOptional.isEmpty())
            throw new BankAccountDocumentNotFoundException(event.getAggregateId());

        final var document = documentOptional.get();
        final var newBalance = document.getBalance().add(event.getAmount());
        document.setBalance(newBalance);
        mongoRepository.save(document);
    }
}
Enter fullscreen mode Exit fullscreen mode

Then we can retrieve projection data using query service:

@Slf4j
@RequiredArgsConstructor
@Service
public class BankAccountQueryHandler implements BankAccountQueryService {

    private final EventStoreDB eventStoreDB;
    private final BankAccountMongoRepository mongoRepository;
    private static final String SERVICE_NAME = "microservice";

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
        Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
        if (optionalDocument.isPresent()) {
            return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
        }

        final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
        final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
        log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);

        final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
        log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
        return bankAccountResponseDTO;
    }

    @Override
    @NewSpan
    @Retry(name = SERVICE_NAME)
    @CircuitBreaker(name = SERVICE_NAME)
    public Page<BankAccountResponseDTO> handle(@SpanTag("query") FindAllOrderByBalance query) {
        return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance")))
                .map(BankAccountMapper::bankAccountResponseDTOFromDocument);
    }
}
Enter fullscreen mode Exit fullscreen mode

More details and source code of the full project you can find here,
of course in real-world applications, we have to implement many more necessary features, like k8s health checks, rate limiters, etc.,
depending on the project it can be implemented in different ways, for example, you can use Kubernetes and Istio for some of them.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Top comments (1)

Collapse
 
lubumbax profile image
lubumbax

Excellent article! Would be great to see this working in Spring Boot 3.x . I believe one of the biggest challenges is moving from Sleuth to Micrometer Tracing. Cheers!