DEV Community

Cover image for Go EventSourcing and CQRS with PostgreSQL, Kafka, MongoDB and ElasticSearch ๐Ÿ‘‹โœจ๐Ÿ’ซ
Alexander
Alexander

Posted on

Go EventSourcing and CQRS with PostgreSQL, Kafka, MongoDB and ElasticSearch ๐Ÿ‘‹โœจ๐Ÿ’ซ

๐Ÿ‘จโ€๐Ÿ’ป Full list what has been used:

PostgeSQL as event store database
Kafka as messages broker
gRPC Go implementation of gRPC
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
MongoDB MongoDB database
Elasticsearch Elasticsearch client for Go.
Echo web framework
Kibana Kibana is data visualization dashboard software for Elasticsearch
Migrate for migrations

Source code you can find in GitHub repository.
The main idea of this project is the implementation of Event Sourcing and CQRS using Go, Postgresql, Kafka for event store and Mongo, ElasticSearch for read projections.
Previously have written same articles where implemented the same microservice using Go and EventStoreDB,
and Spring,
as written before, repeat, think EventStoreDB is the best choice for event sourcing, but in real life at some projects we usually have business restrictions and for example
usage of the EventStoreDB can be not allowed, in this case, think postgres and kafka is good alternative for implementing our own event store.
If you don't familiar with EventSourcing and CQRS patterns, the best place to read is microservices.io,
blog and documentation of eventstore site is very good too,
and highly recommend Alexey Zimarev "Hands-on Domain-Driven Design with .NET Core" book.

In this project we have microservice with eventstore implemented by using PostgeSQL and Kafka,
as read databases for projections MongoDB and Elasticsearch.
Some descriptions in this article repeat previously because there is another implementation by postgres and kafka but the idea is the same.

All UI interfaces will be available on ports:

Jaeger UI: http://localhost:16686

Jaeger

Prometheus UI: http://localhost:9090

Prometheus

Grafana UI: http://localhost:3005

Grafana

Docker compose file for this project:



version: "3.9"

services:
  es_postgesql:
    image: postgres:14.4
    container_name: es_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=bank_accounts
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: 'bitnami/kafka:3.2.0'
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "./kafka_data:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - ./mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.35
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

  node01:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.2.3
    container_name: node01
    restart: always
    environment:
      - node.name=node01
      - cluster.name=es-cluster-8
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.license.self_generated.type=basic
      - xpack.security.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - ./es-data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    networks: [ "microservices" ]

  kibana:
    image: docker.elastic.co/kibana/kibana:8.2.3
    restart: always
    environment:
      ELASTICSEARCH_HOSTS: http://node01:9200
    ports:
      - "5601:5601"
    depends_on:
      - node01
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:
  es-data01:
    driver: local

networks:
  microservices:
    name: microservices


Enter fullscreen mode Exit fullscreen mode

In Event Sourcing we are storing the history of all the actions that have occurred to an entity and deriving the state from that,
it is possible to read back through that history in order to establish what the state was at a given point in time.
It is a pattern for storing data as events in an append-only log.

Every new event is a change.
The AggregateBase should keep track of all the changes that happen during the command execution flow,
so we can persist those changes in the command handler.
Aggregates take the current state, verify the business rules for the particular operation
and apply the business logic that returns the new state. The important part of this process is storing all or nothing.
All aggregated data needs to be saved successfully. If one rule or operation fails then the whole state change is rejected.
AggregateRoot can be implemented in different ways, the main methods is load events - apply and raise changes.
When we fetch the aggregate from the database, instead of reading its state as one record in a table or document,
we read all events that were saved before and call when method for each.
After all these steps, we will recover all the history of a given aggregate.
By doing this, we will be bringing our aggregate to its latest state.



// AggregateRoot contains all methods of AggregateBase
type AggregateRoot interface {
    GetID() string
    SetID(id string) *AggregateBase
    GetType() AggregateType
    SetType(aggregateType AggregateType)
    GetChanges() []any
    ClearChanges()
    GetVersion() uint64
    ToSnapshot()
    String() string
    Load
    Apply
    RaiseEvent
}

// AggregateType type of the Aggregate
type AggregateType string

// AggregateBase base aggregate contains all main necessary fields
type AggregateBase struct {
    ID      string
    Version uint64
    Changes []any
    Type    AggregateType
    when    when
}

func NewAggregateBase(when when) *AggregateBase {
    if when == nil {
        return nil
    }

    return &AggregateBase{
        Version: startVersion,
        Changes: make([]any, 0, changesEventsCap),
        when:    when,
    }
}

// ClearChanges clear AggregateBase uncommitted Event's
func (a *AggregateBase) ClearChanges() {
    a.Changes = make([]any, 0, changesEventsCap)
}

// GetChanges get AggregateBase uncommitted Event's
func (a *AggregateBase) GetChanges() []any {
    return a.Changes
}

// Load add existing events from event store to aggregate using When interface method
func (a *AggregateBase) Load(events []any) error {
    for _, evt := range events {
        if err := a.when(evt); err != nil {
            return err
        }
        a.Version++
    }
    return nil
}

// Apply push event to aggregate uncommitted events using When method
func (a *AggregateBase) Apply(event any) error {
    if err := a.when(event); err != nil {
        return err
    }
    a.Version++
    a.Changes = append(a.Changes, event)
    return nil
}

// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event any) error {
    if err := a.when(event); err != nil {
        return err
    }
    a.Version++
    return nil
}



Enter fullscreen mode Exit fullscreen mode

An event represents a fact that took place in the domain. They are the source of truth, current state is derived from the events.
Events are immutable and represent the business facts.
In Event Sourcing, each operation made on the aggregate should result with the new event.
An event represents a fact that took place in the domain. They are the source of truth, your current state is derived from the events.
They are immutable and represent the business facts.
It means that we never change or remove anything in the database, and we only append new events.

Postgres



type Event struct {
    EventID       string
    AggregateID   string
    EventType     EventType
    AggregateType AggregateType
    Version       uint64
    Data          []byte
    Metadata      []byte
    Timestamp     time.Time
}


Enter fullscreen mode Exit fullscreen mode

Snapshots are the representation of the current state at a certain "point in time".
If we follow the Event Sourcing pattern literally, we need to get all these transactions to calculate the current account's balance.
This won't be efficient. Your first thought to make this more efficient may be caching the latest state somewhere.
Instead of retrieving all these events, we could retrieve one record and use it for our business logic. This is a Snapshot.
The general logic: read the snapshot (if it exists), then read events from the EventStore,
if a snapshot exists, read events since the last stream revision of which snapshot was created, otherwise, read all events.
In our microservice we are storing the snapshot of every N number of events.
Snapshots can be not needed as performance may be good enough.



type Snapshot struct {
    ID      string        `json:"id"`
    Type    AggregateType `json:"type"`
    State   []byte        `json:"state"`
    Version uint64        `json:"version"`
}


Enter fullscreen mode Exit fullscreen mode

Event store is a key element of a system. Each change that took place in the domain is recorded in the database.
It is specifically designed to store the history of changes, the state is represented by the append-only log of events.
The events are immutable: they cannot be changed.
Implementation of AggregateStore is Load, Save, and Exists methods.
Load and Save accept aggregate then load or apply events using EventStoreDB client.
The Load method: find out the stream name for an aggregate, read all the events from the aggregate stream,
loop through all the events, and call the RaiseEvent handler for each of them.
And the Save method persists aggregates by saving the history of changes, handling concurrency,
when you retrieve a stream from EventStore, you take note of the current version number,
then when you save it back you can determine if somebody else has modified the record in the meantime.



// Load es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())
    if err != nil && !errors.Is(err, pgx.ErrNoRows) {
        return tracing.TraceWithErr(span, err)
    }

    if snapshot != nil {
        if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {
            p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)
            return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))
        }

        err := p.loadAggregateEventsByVersion(ctx, aggregate)
        if err != nil {
            return err
        }

        p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())
        span.LogFields(log.String("aggregate with events", aggregate.String()))
        return nil
    }

    err = p.loadEvents(ctx, aggregate)
    if err != nil {
        return err
    }

    p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())
    span.LogFields(log.String("aggregate with events", aggregate.String()))
    return nil
}

// Save es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    if len(aggregate.GetChanges()) == 0 {
        p.log.Debug("(Save) aggregate.GetChanges()) == 0")
        span.LogFields(log.Int("events", len(aggregate.GetChanges())))
        return nil
    }

    tx, err := p.db.Begin(ctx)
    if err != nil {
        p.log.Errorf("(Save) db.Begin err: %v", err)
        return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))
    }

    defer func() {
        if tx != nil {
            if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {
                err = txErr
                tracing.TraceErr(span, err)
                return
            }
        }
    }()

    changes := aggregate.GetChanges()
    events := make([]Event, 0, len(changes))

    for i := range changes {
        event, err := p.serializer.SerializeEvent(aggregate, changes[i])
        if err != nil {
            p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)
            return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))
        }
        events = append(events, event)
    }

    if err := p.saveEventsTx(ctx, tx, events); err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))
    }

    if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {
        aggregate.ToSnapshot()
        if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {
            return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))
        }
    }

    if err := p.processEvents(ctx, events); err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))
    }

    p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())
    span.LogFields(log.String("aggregate with events", aggregate.String()))
    return tx.Commit(ctx)
}


Enter fullscreen mode Exit fullscreen mode

For event serialization and deserialization we need implementation of the Serializer interface:



type Serializer interface {
    SerializeEvent(aggregate Aggregate, event any) (Event, error)
    DeserializeEvent(event Event) (any, error)
}


Enter fullscreen mode Exit fullscreen mode

the implementation for bank account aggregate:



type eventSerializer struct {
}

func NewEventSerializer() *eventSerializer {
    return &eventSerializer{}
}

func (s *eventSerializer) SerializeEvent(aggregate es.Aggregate, event any) (es.Event, error) {
    eventBytes, err := serializer.Marshal(event)
    if err != nil {
        return es.Event{}, errors.Wrapf(err, "serializer.Marshal aggregateID: %s", aggregate.GetID())
    }

    switch evt := event.(type) {

    case *events.BankAccountCreatedEventV1:
        return es.NewEvent(aggregate, events.BankAccountCreatedEventType, eventBytes, evt.Metadata), nil

    case *events.BalanceDepositedEventV1:
        return es.NewEvent(aggregate, events.BalanceDepositedEventType, eventBytes, evt.Metadata), nil

    case *events.BalanceWithdrawnEventV1:
        return es.NewEvent(aggregate, events.BalanceWithdrawnEventType, eventBytes, evt.Metadata), nil

    case *events.EmailChangedEventV1:
        return es.NewEvent(aggregate, events.EmailChangedEventType, eventBytes, evt.Metadata), nil

    default:
        return es.Event{}, errors.Wrapf(ErrInvalidEvent, "aggregateID: %s, type: %T", aggregate.GetID(), event)
    }

}

func (s *eventSerializer) DeserializeEvent(event es.Event) (any, error) {
    switch event.GetEventType() {

    case events.BankAccountCreatedEventType:
        return deserializeEvent(event, new(events.BankAccountCreatedEventV1))

    case events.BalanceDepositedEventType:
        return deserializeEvent(event, new(events.BalanceDepositedEventV1))

    case events.BalanceWithdrawnEventType:
        return deserializeEvent(event, new(events.BalanceWithdrawnEventV1))

    case events.EmailChangedEventType:
        return deserializeEvent(event, new(events.EmailChangedEventV1))

    default:
        return nil, errors.Wrapf(ErrInvalidEvent, "type: %s", event.GetEventType())
    }
}



Enter fullscreen mode Exit fullscreen mode

For the next step let's create a bank account aggregate:



const (
    BankAccountAggregateType es.AggregateType = "BankAccount"
)

type BankAccountAggregate struct {
    *es.AggregateBase
    BankAccount *BankAccount
}

func NewBankAccountAggregate(id string) *BankAccountAggregate {
    if id == "" {
        return nil
    }

    bankAccountAggregate := &BankAccountAggregate{BankAccount: NewBankAccount(id)}
    aggregateBase := es.NewAggregateBase(bankAccountAggregate.When)
    aggregateBase.SetType(BankAccountAggregateType)
    aggregateBase.SetID(id)
    bankAccountAggregate.AggregateBase = aggregateBase
    return bankAccountAggregate
}

func (a *BankAccountAggregate) When(event any) error {

    switch evt := event.(type) {

    case *events.BankAccountCreatedEventV1:
        a.BankAccount.Email = evt.Email
        a.BankAccount.Address = evt.Address
        a.BankAccount.Balance = evt.Balance
        a.BankAccount.FirstName = evt.FirstName
        a.BankAccount.LastName = evt.LastName
        a.BankAccount.Status = evt.Status
        return nil

    case *events.BalanceDepositedEventV1:
        return a.BankAccount.DepositBalance(evt.Amount)

    case *events.BalanceWithdrawnEventV1:
        return a.BankAccount.WithdrawBalance(evt.Amount)

    case *events.EmailChangedEventV1:
        a.BankAccount.Email = evt.Email
        return nil

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "event: %#v", event)
    }
}

func (a *BankAccountAggregate) CreateNewBankAccount(ctx context.Context, email, address, firstName, lastName, status string, amount int64) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.CreateNewBankAccount")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount < 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BankAccountCreatedEventV1{
        Email:     email,
        Address:   address,
        FirstName: firstName,
        LastName:  lastName,
        Balance:   money.New(amount, money.USD),
        Status:    status,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) DepositBalance(ctx context.Context, amount int64, paymentID string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.DepositBalance")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount <= 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BalanceDepositedEventV1{
        Amount:    amount,
        PaymentID: paymentID,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) WithdrawBalance(ctx context.Context, amount int64, paymentID string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.WithdrawBalance")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount <= 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    balance, err := money.New(a.BankAccount.Balance.Amount(), money.USD).Subtract(money.New(amount, money.USD))
    if err != nil {
        return errors.Wrapf(err, "Balance.Subtract amount: %d", amount)
    }

    if balance.IsNegative() {
        return errors.Wrapf(bankAccountErrors.ErrNotEnoughBalance, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BalanceWithdrawnEventV1{
        Amount:    amount,
        PaymentID: paymentID,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) ChangeEmail(ctx context.Context, email string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.ChangeEmail")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.EmailChangedEventV1{Email: email, Metadata: metaDataBytes}

    return a.Apply(event)
}


Enter fullscreen mode Exit fullscreen mode

Jaeger

Our microservice accepts HTTP and gRPC requests:
The bank account REST controller, which accept requests, validate it using validator,
then call a command or query service.
The main reason for CQRS gaining popularity is the ability to handle reads
and writes separately due to severe differences in optimization techniques for those much more distinct operations.
As Go http framework used echo.

Http handlers:



func (h *bankAccountHandlers) CreateBankAccount() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.CreateBankAccount")
        defer span.Finish()
        h.metrics.HttpCreateBankAccountRequests.Inc()

        var command commands.CreateBankAccountCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        command.AggregateID = uuid.NewV4().String()

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(CreateBankAccount.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(BankAccount created) id: %s", command.AggregateID)
        return c.JSON(http.StatusCreated, command.AggregateID)
    }
}

func (h *bankAccountHandlers) DepositBalance() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.DepositBalance")
        defer span.Finish()
        h.metrics.HttpDepositBalanceRequests.Inc()

        var command commands.DepositBalanceCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(DepositBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance deposited) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) WithdrawBalance() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
        defer span.Finish()
        h.metrics.HttpWithdrawBalanceRequests.Inc()

        var command commands.WithdrawBalanceCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(WithdrawBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) ChangeEmail() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
        defer span.Finish()
        h.metrics.HttpChangeEmailRequests.Inc()

        var command commands.ChangeEmailCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) GetByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.GetByID")
        defer span.Finish()
        h.metrics.HttpGetBuIdRequests.Inc()

        var query queries.GetBankAccountByIDQuery
        if err := c.Bind(&query); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query.AggregateID = c.Param(constants.ID)

        fromStore := c.QueryParam("store")
        if fromStore != "" {
            isFromStore, err := strconv.ParseBool(fromStore)
            if err != nil {
                h.log.Errorf("strconv.ParseBool err: %v", tracing.TraceWithErr(span, err))
                return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
            }
            query.FromEventStore = isFromStore
        }

        if err := h.validate.StructCtx(ctx, query); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        bankAccountProjection, err := h.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
        if err != nil {
            h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(get bank account) id: %s", bankAccountProjection.AggregateID)
        return c.JSON(http.StatusOK, mappers.BankAccountMongoProjectionToHttp(bankAccountProjection))
    }
}

func (h *bankAccountHandlers) Search() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.Search")
        defer span.Finish()
        h.metrics.HttpSearchRequests.Inc()

        var query queries.SearchBankAccountsQuery
        if err := c.Bind(&query); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query.QueryTerm = c.QueryParam("search")
        query.Pagination = utils.NewPaginationFromQueryParams(c.QueryParam(constants.Size), c.QueryParam(constants.Page))

        if err := h.validate.StructCtx(ctx, query); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        searchResult, err := h.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
        if err != nil {
            h.log.Errorf("(SearchBankAccounts.Handle) id: %s, err: %v", query.QueryTerm, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        response := mappers.SearchResultToHttp(searchResult.List, searchResult.PaginationResponse)

        h.log.Infof("(search) result: %+v", response)
        return c.JSON(http.StatusOK, response)
    }
}


Enter fullscreen mode Exit fullscreen mode

Prometheus

Can recommend bloomrpc is good GUI Client for GRPC Services.
Grpc service handlers:



func (g *grpcService) CreateBankAccount(ctx context.Context, request *bankAccountService.CreateBankAccountRequest) (*bankAccountService.CreateBankAccountResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.CreateBankAccount")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcCreateBankAccountRequests.Inc()

    aggregateID := uuid.NewV4().String()
    command := commands.CreateBankAccountCommand{
        AggregateID: aggregateID,
        Email:       request.GetEmail(),
        Address:     request.GetAddress(),
        FirstName:   request.GetFirstName(),
        LastName:    request.GetLastName(),
        Balance:     request.GetBalance(),
        Status:      request.GetStatus(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(CreateBankAccount.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [created account] aggregateID: %s", aggregateID)
    return &bankAccountService.CreateBankAccountResponse{Id: aggregateID}, nil
}

func (g *grpcService) DepositBalance(ctx context.Context, request *bankAccountService.DepositBalanceRequest) (*bankAccountService.DepositBalanceResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.DepositBalance")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcDepositBalanceRequests.Inc()

    command := commands.DepositBalanceCommand{
        AggregateID: request.GetId(),
        Amount:      request.GetAmount(),
        PaymentID:   request.GetPaymentId(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(DepositBalance.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [deposited balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
    return new(bankAccountService.DepositBalanceResponse), nil
}

func (g *grpcService) WithdrawBalance(ctx context.Context, request *bankAccountService.WithdrawBalanceRequest) (*bankAccountService.WithdrawBalanceResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.WithdrawBalance")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcWithdrawBalanceRequests.Inc()

    command := commands.WithdrawBalanceCommand{
        AggregateID: request.GetId(),
        Amount:      request.GetAmount(),
        PaymentID:   request.GetPaymentId(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(WithdrawBalance.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [withdraw balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
    return new(bankAccountService.WithdrawBalanceResponse), nil
}

func (g *grpcService) ChangeEmail(ctx context.Context, request *bankAccountService.ChangeEmailRequest) (*bankAccountService.ChangeEmailResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.ChangeEmail")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcChangeEmailRequests.Inc()

    command := commands.ChangeEmailCommand{AggregateID: request.GetId(), NewEmail: request.GetEmail()}

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(ChangeEmail.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [changed email] aggregateID: %s, newEmail: %s", request.GetId(), request.GetEmail())
    return new(bankAccountService.ChangeEmailResponse), nil
}

func (g *grpcService) GetById(ctx context.Context, request *bankAccountService.GetByIdRequest) (*bankAccountService.GetByIdResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetById")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcGetBuIdRequests.Inc()

    query := queries.GetBankAccountByIDQuery{AggregateID: request.GetId(), FromEventStore: request.IsOwner}

    if err := g.validate.StructCtx(ctx, query); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    bankAccountProjection, err := g.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
    if err != nil {
        g.log.Errorf("(GetBankAccountByID.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [get account by id] projection: %+v", bankAccountProjection)
    return &bankAccountService.GetByIdResponse{BankAccount: mappers.BankAccountMongoProjectionToProto(bankAccountProjection)}, nil
}

func (g *grpcService) SearchBankAccounts(ctx context.Context, request *bankAccountService.SearchBankAccountsRequest) (*bankAccountService.SearchBankAccountsResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.SearchBankAccounts")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcSearchRequests.Inc()

    query := queries.SearchBankAccountsQuery{
        QueryTerm: request.GetSearchText(),
        Pagination: &utils.Pagination{
            Size: int(request.GetSize()),
            Page: int(request.GetPage()),
        },
    }

    if err := g.validate.StructCtx(ctx, query); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    searchQueryResult, err := g.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
    if err != nil {
        g.log.Errorf("(SearchBankAccounts.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [search] result: %+vv", searchQueryResult.PaginationResponse)
    return &bankAccountService.SearchBankAccountsResponse{
        BankAccounts: mappers.SearchBankAccountsListToProto(searchQueryResult.List),
        Pagination:   mappers.PaginationResponseToProto(searchQueryResult.PaginationResponse),
    }, nil
}


Enter fullscreen mode Exit fullscreen mode

The main attribute of a command is that when the command gets successfully executed, the system transitions to a new state.
Command handlers are responsible for handling commands, mutating state or doing other side effects.
The command service handle cqrs commands, load aggregate from event store and call its methods depend on business logic of the application,
aggregate applies these changes, and then we save these events changes list in event store.

Jaeger

Create bank account command here is simple, but in real world of course business logic is much more complicated, we must check email availability etc.



type CreateBankAccountCommand struct {
    AggregateID string `json:"id" validate:"required,gte=0"`
    Email       string `json:"email" validate:"required,gte=0,email"`
    Address     string `json:"address" validate:"required,gte=0"`
    FirstName   string `json:"firstName" validate:"required,gte=0"`
    LastName    string `json:"lastName" validate:"required,gte=0"`
    Balance     int64  `json:"balance" validate:"gte=0"`
    Status      string `json:"status"`
}

type CreateBankAccount interface {
    Handle(ctx context.Context, cmd CreateBankAccountCommand) error
}

type createBankAccountCmdHandler struct {
    log            logger.Logger
    aggregateStore es.AggregateStore
}

func NewCreateBankAccountCmdHandler(log logger.Logger, aggregateStore es.AggregateStore) *createBankAccountCmdHandler {
    return &createBankAccountCmdHandler{log: log, aggregateStore: aggregateStore}
}

func (c *createBankAccountCmdHandler) Handle(ctx context.Context, cmd CreateBankAccountCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createBankAccountCmdHandler.Handle")
    defer span.Finish()
    span.LogFields(log.Object("command", cmd))

    exists, err := c.aggregateStore.Exists(ctx, cmd.AggregateID)
    if err != nil {
        return tracing.TraceWithErr(span, err)
    }
    if exists {
        return tracing.TraceWithErr(span, errors.New("already exists"))
    }

    bankAccountAggregate := domain.NewBankAccountAggregate(cmd.AggregateID)
    err = bankAccountAggregate.CreateNewBankAccount(ctx, cmd.Email, cmd.Address, cmd.FirstName, cmd.LastName, cmd.Status, cmd.Balance)
    if err != nil {
        return tracing.TraceWithErr(span, err)
    }

    return c.aggregateStore.Save(ctx, bankAccountAggregate)
}


Enter fullscreen mode Exit fullscreen mode

Jaeger

In Event Sourcing, Projections (also known as View Models or Query Models) provide a view of the underlying event-based data model.
Often they represent the logic of translating the source write model into the read model.
The idea is that the projection will receive all the events that it is able to project and will do the normal CRUD operations on the read model it controls,
using the normal CRUD operations provided by the read model database.
Projections arenโ€™t limited to only processing events of a single entity and can assemble and aggregate data for multiple entities, even for different types of entities.
Events appended in the event store trigger the projection logic that creates or updates the read model.
We can subscribe to our projections for order-type stream events.
When we execute a command, the aggregate generates a new event that represents the state transitions of the aggregate.
Those events are committed to the store, so the store appends them to the end of the aggregate stream.
A projection receives these events and updates its read models, using When method, like aggregate it applies changes depending on the event type:



func (s *mongoSubscription) ProcessMessagesErrGroup(ctx context.Context, r *kafka.Reader, workerID int) error {

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("(mongoSubscription) workerID: %d, err: %v", workerID, err)
            continue
        }

        switch m.Topic {
        case es.GetTopicName(s.cfg.KafkaPublisherConfig.TopicPrefix, string(domain.BankAccountAggregateType)):
            s.handleBankAccountEvents(ctx, r, m)
        }
    }
}

func (s *mongoSubscription) handleBankAccountEvents(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "mongoSubscription.handleBankAccountEvents")
    defer span.Finish()

    var events []es.Event
    if err := serializer.Unmarshal(m.Value, &events); err != nil {
        s.log.Errorf("serializer.Unmarshal: %v", tracing.TraceWithErr(span, err))
        s.commitErrMessage(ctx, r, m)
        return
    }

    for _, event := range events {
        if err := s.handle(ctx, r, m, event); err != nil {
            return
        }
    }
    s.commitMessage(ctx, r, m)
}

func (s *mongoSubscription) handle(ctx context.Context, r *kafka.Reader, m kafka.Message, event es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoSubscription.handle")
    defer span.Finish()

    err := s.projection.When(ctx, event)
    if err != nil {
        s.log.Errorf("MongoSubscription When err: %v", err)

        recreateErr := s.recreateProjection(ctx, event)
        if recreateErr != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(recreateErr, "recreateProjection err: %v", err))
        }

        s.commitErrMessage(ctx, r, m)
        return tracing.TraceWithErr(span, errors.Wrapf(err, "When type: %s, aggregateID: %s", event.GetEventType(), event.GetAggregateID()))
    }

    s.log.Infof("MongoSubscription <<<commit>>> event: %s", event.String())
    return nil
}


Enter fullscreen mode Exit fullscreen mode

mongo
MongoDB projection handles events by implementing When method,
handle events and apply changes like aggregate then save it using mongodb repository:



type bankAccountMongoProjection struct {
    log             logger.Logger
    cfg             *config.Config
    serializer      es.Serializer
    mongoRepository domain.MongoRepository
}

func NewBankAccountMongoProjection(
    log logger.Logger,
    cfg *config.Config,
    serializer es.Serializer,
    mongoRepository domain.MongoRepository,
) *bankAccountMongoProjection {
    return &bankAccountMongoProjection{log: log, cfg: cfg, serializer: serializer, mongoRepository: mongoRepository}
}

func (b *bankAccountMongoProjection) When(ctx context.Context, esEvent es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.When")
    defer span.Finish()

    deserializedEvent, err := b.serializer.DeserializeEvent(esEvent)
    if err != nil {
        return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
    }

    switch event := deserializedEvent.(type) {

    case *events.BankAccountCreatedEventV1:
        return b.onBankAccountCreated(ctx, esEvent, event)

    case *events.BalanceDepositedEventV1:
        return b.onBalanceDeposited(ctx, esEvent, event)

    case *events.BalanceWithdrawnEventV1:
        return b.onBalanceWithdrawn(ctx, esEvent, event)

    case *events.EmailChangedEventV1:
        return b.onEmailChanged(ctx, esEvent, event)

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
    }
}

func (b *bankAccountMongoProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBankAccountCreated")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if esEvent.GetVersion() != 1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
    }

    projection := &domain.BankAccountMongoProjection{
        AggregateID: esEvent.GetAggregateID(),
        Version:     esEvent.GetVersion(),
        Email:       event.Email,
        Address:     event.Address,
        FirstName:   event.FirstName,
        LastName:    event.LastName,
        Balance: domain.Balance{
            Amount:   event.Balance.AsMajorUnits(),
            Currency: event.Balance.Currency().Code,
        },
        Status:    event.Status,
        UpdatedAt: time.Now().UTC(),
        CreatedAt: time.Now().UTC(),
    }

    err := b.mongoRepository.Insert(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBankAccountCreated] mongoRepository.Insert aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBankAccountCreated] projection: %#v", projection)
    return nil
}

func (b *bankAccountMongoProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceDeposited")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
        projection.Version = esEvent.GetVersion()
        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBalanceDeposited] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}

func (b *bankAccountMongoProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceWithdrawn")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
        projection.Version = esEvent.GetVersion()

        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBalanceWithdrawn] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}

func (b *bankAccountMongoProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onEmailChanged")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Email = event.Email
        projection.Version = esEvent.GetVersion()
        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onEmailChanged] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}


Enter fullscreen mode Exit fullscreen mode

kibana

ElasticSearch projection doing the same thing, index documents for searching:



type elasticProjection struct {
    log               logger.Logger
    cfg               *config.Config
    serializer        es.Serializer
    elasticSearchRepo domain.ElasticSearchRepository
}

func NewElasticProjection(log logger.Logger, cfg *config.Config, serializer es.Serializer, elasticSearchRepo domain.ElasticSearchRepository) *elasticProjection {
    return &elasticProjection{log: log, cfg: cfg, serializer: serializer, elasticSearchRepo: elasticSearchRepo}
}

func (e *elasticProjection) When(ctx context.Context, esEvent es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.When")
    defer span.Finish()

    deserializedEvent, err := e.serializer.DeserializeEvent(esEvent)
    if err != nil {
        return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
    }

    switch event := deserializedEvent.(type) {

    case *events.BankAccountCreatedEventV1:
        return e.onBankAccountCreated(ctx, esEvent, event)

    case *events.BalanceDepositedEventV1:
        return e.onBalanceDeposited(ctx, esEvent, event)

    case *events.BalanceWithdrawnEventV1:
        return e.onBalanceWithdrawn(ctx, esEvent, event)

    case *events.EmailChangedEventV1:
        return e.onEmailChanged(ctx, esEvent, event)

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
    }
}

func (e *elasticProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBankAccountCreated")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if esEvent.GetVersion() != 1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
    }

    projection := &domain.ElasticSearchProjection{
        ID:          esEvent.GetAggregateID(),
        AggregateID: esEvent.GetAggregateID(),
        Version:     esEvent.GetVersion(),
        Email:       event.Email,
        Address:     event.Address,
        FirstName:   event.FirstName,
        LastName:    event.LastName,
        Balance: domain.Balance{
            Amount:   event.Balance.AsMajorUnits(),
            Currency: event.Balance.Currency().Code,
        },
        Status:    event.Status,
        UpdatedAt: time.Now().UTC(),
        CreatedAt: time.Now().UTC(),
    }

    err := e.elasticSearchRepo.Index(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.Index aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBankAccountCreated] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceDeposited")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBalanceDeposited] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceWithdrawn")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBalanceWithdrawn] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onEmailChanged")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Email = event.Email
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onEmailChanged] projection: %s", projection)
    return nil
}

func (e *elasticProjection) validateEventVersion(version uint64, esEvent es.Event) error {
    if version != esEvent.GetVersion()-1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, eventVersion: %d, projectionVersion: %d", esEvent.GetEventType(), esEvent.GetVersion(), version)
    }
    return nil
}


Enter fullscreen mode Exit fullscreen mode

postman

Queries in CQRS represent the intention to get data and are responsible for returning the result of the requested query.
The read model can be but doesnโ€™t have to be, derived from the write model.
Itโ€™s a transformation of the results of the business operation into a readable form.
One of the great outcomes of having an event-sourced system is the ability to create new read models at will,
at any time, without affecting anything else.
Then we can retrieve projection data using query:

bloomprc

Get bank account by id query can load it from mongodb or aggregate store if it is required:



type GetBankAccountByIDQuery struct {
    AggregateID    string `json:"aggregateID" validate:"required,gte=0"`
    FromEventStore bool   `json:"fromEventStore"`
}

type GetBankAccountByID interface {
    Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error)
}

type getBankAccountByIDQuery struct {
    log             logger.Logger
    aggregateStore  es.AggregateStore
    mongoRepository domain.MongoRepository
}

func NewGetBankAccountByIDQuery(log logger.Logger, aggregateStore es.AggregateStore, mongoRepository domain.MongoRepository) *getBankAccountByIDQuery {
    return &getBankAccountByIDQuery{log: log, aggregateStore: aggregateStore, mongoRepository: mongoRepository}
}

func (q *getBankAccountByIDQuery) Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.Handle")
    defer span.Finish()
    span.LogFields(log.Object("query", query))

    if query.FromEventStore {
        return q.loadFromAggregateStore(ctx, query)
    }

    projection, err := q.mongoRepository.GetByAggregateID(ctx, query.AggregateID)
    if err != nil {
        if errors.Is(err, mongo.ErrNoDocuments) {
            bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
            if err = q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
                return nil, tracing.TraceWithErr(span, err)
            }
            if bankAccountAggregate.GetVersion() == 0 {
                return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
            }

            mongoProjection := mappers.BankAccountToMongoProjection(bankAccountAggregate)
            err = q.mongoRepository.Upsert(ctx, mongoProjection)
            if err != nil {
                q.log.Errorf("(GetBankAccountByIDQuery) mongo Upsert AggregateID: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
            }
            q.log.Debugf("(GetBankAccountByIDQuery) Upsert %+v", query)
            return mongoProjection, nil

        }
        return nil, tracing.TraceWithErr(span, err)
    }

    q.log.Debugf("(GetBankAccountByIDQuery) from mongo %+v", query)
    return projection, nil
}

func (q *getBankAccountByIDQuery) loadFromAggregateStore(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.loadFromAggregateStore")
    defer span.Finish()

    bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
    if err := q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
        return nil, tracing.TraceWithErr(span, err)
    }
    if bankAccountAggregate.GetVersion() == 0 {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
    }

    q.log.Debugf("(GetBankAccountByIDQuery) from aggregateStore bankAccountAggregate: %+v", bankAccountAggregate.BankAccount)
    return mappers.BankAccountToMongoProjection(bankAccountAggregate), nil
}


Enter fullscreen mode Exit fullscreen mode

Jaeger

Bank account Mongo repository methods using official mongo client:



type bankAccountMongoRepository struct {
    log logger.Logger
    cfg *config.Config
    db  *mongo.Client
}

func NewBankAccountMongoRepository(log logger.Logger, cfg *config.Config, db *mongo.Client) *bankAccountMongoRepository {
    return &bankAccountMongoRepository{log: log, cfg: cfg, db: db}
}

func (b *bankAccountMongoRepository) Insert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Insert")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    _, err := b.bankAccountsCollection().InsertOne(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[InsertOne] AggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Insert] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) Update(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.ID = ""
    projection.UpdatedAt = time.Now().UTC()

    ops := options.FindOneAndUpdate()
    ops.SetReturnDocument(options.After)
    ops.SetUpsert(false)
    filter := bson.M{constants.MongoAggregateID: projection.AggregateID}

    err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Update] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) UpdateConcurrently(ctx context.Context, aggregateID string, updateCb domain.UpdateProjectionCallback, expectedVersion uint64) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    session, err := b.db.StartSession()
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "StartSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
    }
    defer session.EndSession(ctx)

    err = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error {
        if err := session.StartTransaction(); err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "StartTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        filter := bson.M{constants.MongoAggregateID: aggregateID}
        foundProjection := &domain.BankAccountMongoProjection{}

        err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(foundProjection)
        if err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        if foundProjection.Version != expectedVersion {
            return tracing.TraceWithErr(span, errors.Wrapf(es.ErrInvalidEventVersion, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        foundProjection = updateCb(foundProjection)

        foundProjection.ID = ""
        foundProjection.UpdatedAt = time.Now().UTC()

        ops := options.FindOneAndUpdate()
        ops.SetReturnDocument(options.After)
        ops.SetUpsert(false)
        filter = bson.M{constants.MongoAggregateID: foundProjection.AggregateID}

        err = b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": foundProjection}, ops).Decode(foundProjection)
        if err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion))
        }

        b.log.Infof("[UpdateConcurrently] result AggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion)
        return session.CommitTransaction(ctx)
    })
    if err != nil {
        if err := session.AbortTransaction(ctx); err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "AbortTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }
        return tracing.TraceWithErr(span, errors.Wrapf(err, "mongo.WithSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
    }

    return nil
}

func (b *bankAccountMongoRepository) Upsert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.UpdatedAt = time.Now().UTC()

    ops := options.FindOneAndUpdate()
    ops.SetReturnDocument(options.After)
    ops.SetUpsert(true)
    filter := bson.M{constants.MongoAggregateID: projection.AggregateID}

    err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "Upsert [FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Upsert] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.DeleteByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    filter := bson.M{constants.MongoAggregateID: aggregateID}
    ops := options.Delete()

    result, err := b.bankAccountsCollection().DeleteOne(ctx, filter, ops)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "DeleteByAggregateID [FindOneAndDelete] aggregateID: %s", aggregateID))
    }

    b.log.Debugf("[DeleteByAggregateID] result AggregateID: %s, deleteCount: %d", aggregateID, result.DeletedCount)
    return nil
}

func (b *bankAccountMongoRepository) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.GetByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    filter := bson.M{constants.MongoAggregateID: aggregateID}
    var projection domain.BankAccountMongoProjection

    err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(&projection)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[GetByAggregateID] result projection: %+v", projection)
    return &projection, nil
}

func (b *bankAccountMongoRepository) bankAccountsCollection() *mongo.Collection {
    return b.db.Database(b.cfg.Mongo.Db).Collection(b.cfg.MongoCollections.BankAccounts)
}


Enter fullscreen mode Exit fullscreen mode

ElasticSearch repository implementation uses go-elasticsearch official library,
another good one is olivere elastic but here it's not support 8 version which used for this project.



type elasticRepo struct {
    log    logger.Logger
    cfg    *config.Config
    client *elasticsearch.Client
}

func NewElasticRepository(log logger.Logger, cfg *config.Config, client *elasticsearch.Client) *elasticRepo {
    return &elasticRepo{log: log, cfg: cfg, client: client}
}

func (e *elasticRepo) Index(ctx context.Context, projection *domain.ElasticSearchProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Index")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    response, err := esclient.Index(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Index id: %s", projection.AggregateID))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Index warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch index result: %s", response.String())
    return nil
}

func (e *elasticRepo) Update(ctx context.Context, projection *domain.ElasticSearchProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.UpdatedAt = time.Now().UTC()

    response, err := esclient.Update(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Update id: %s", projection.AggregateID))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Update warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch update result: %s", response.String())
    return nil
}

func (e *elasticRepo) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.DeleteByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    response, err := esclient.Delete(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Delete id: %s", aggregateID))
    }
    defer response.Body.Close()

    if response.IsError() && response.StatusCode != http.StatusNotFound {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch delete"), "response.IsError aggregateID: %s, status: %s", aggregateID, response.Status()))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Delete warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch delete result: %s", response.String())
    return nil
}

func (e *elasticRepo) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.ElasticSearchProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.GetByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    response, err := esclient.GetByID[*domain.ElasticSearchProjection](ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.GetByID id: %s", aggregateID))
    }

    e.log.Infof("ElasticSearch delete result: %+v", response)
    return response.Source, nil
}

func (e *elasticRepo) Search(ctx context.Context, term string, options esclient.SearchOptions) (*esclient.SearchListResponse[*domain.ElasticSearchProjection], error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Search")
    defer span.Finish()
    span.LogFields(log.String("term", term))

    searchMatchPrefixRequest := esclient.SearchMatchPrefixRequest{
        Index:   []string{e.cfg.ElasticIndexes.BankAccounts},
        Term:    term,
        Size:    options.Size,
        From:    options.From,
        Sort:    []string{"balance.amount"},
        Fields:  options.Fields,
        SortMap: map[string]interface{}{"balance.amount": "asc"},
    }

    if options.Sort != nil {
        searchMatchPrefixRequest.Sort = options.Sort
    }

    response, err := esclient.SearchMultiMatchPrefix[*domain.ElasticSearchProjection](ctx, e.client, searchMatchPrefixRequest)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.SearchMultiMatchPrefix term: %s", term))
    }

    return response, nil
}


Enter fullscreen mode Exit fullscreen mode

More details and source code of the full project you can find here,
of course in real-world applications, business logic and infrastructure code is much more complicated and we have to implement many more necessary features.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Top comments (0)