In this article let's try to create closer to real world CQRS microservices with tracing and monitoring using: π
Kafka as messages broker
gRPC Go implementation of gRPC
PostgreSQL as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
MongoDB Web and API based SMTP testing
Redis Type-safe Redis client for Golang
swag Swagger for Go
Echo web framework
Source code you can find in GitHub repository
Main idea here is implementation of CQRS using Go, Kafka and gRPC.
I don't try to write about what is CQRS pattern, because it's makes the article to huge and the best place to read is microservices.io.
Found very interesting and take as starting point example CQRS project and blog of Three Dots Labs.
In this example we have three services, api gateway, read and write services which communicates by kafka and gRPC,
for write database used Postgres, MongoDB for read and Redis for caching.
Like any real-world project of course we need metrics and tracing, here used Prometheus and Grafana for metrics, and Jaeger for tracing.
In this example did not implemented any interesting business logic and didn't cover tests, because don't have time.
UI interfaces will be available on ports:
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3000
Swagger UI: http://localhost:5001/swagger/index.html
For local development:
make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation
For run all in the docker you can run make docker_dev it has hot reloading feature.
Docker compose file for this project:
version: "3.8"
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
microservices_postgesql:
image: postgres:13-alpine
container_name: microservices_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=products
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
redis:
image: redis:6-alpine
restart: always
container_name: microservices_redis
ports:
- "6379:6379"
networks: [ "microservices" ]
zoo1:
image: zookeeper:3.4.9
restart: always
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
networks: [ "microservices" ]
kafka1:
image: confluentinc/cp-kafka:5.5.1
restart: always
hostname: kafka1
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
networks: [ "microservices" ]
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- "27017:27017"
volumes:
- mongodb_data_container:/data/db
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
volumes:
mongodb_data_container:
networks:
microservices:
name: microservices
Api gateway service idea is to accept http requests, commands handlers publish events to kafka and queries handlers for retrieving data from reader service by gRPC.
In this project used Echo, but found Gin and Chi is very good for production too.
Let's look at code, create product http handler, accept and validate request body, then generate uuid and call create product command.
Many holly wars about passing id to command or service methods, as separate parameter or in body, or generate id in the command and return it,
can commands return values or not and etc., it's does not make sense, it's up to you and your team which way to use, better spent time and concentrate on more important business tasks :)
// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
return func(c echo.Context) error {
h.metrics.CreateProductHttpRequests.Inc()
ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
defer span.Finish()
createDto := &dto.CreateProductDto{}
if err := c.Bind(createDto); err != nil {
h.log.WarnMsg("Bind", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
createDto.ProductID = uuid.NewV4()
if err := h.v.StructCtx(ctx, createDto); err != nil {
h.log.WarnMsg("validate", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
h.log.WarnMsg("CreateProduct", err)
h.metrics.ErrorHttpRequests.Inc()
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.metrics.SuccessHttpRequests.Inc()
return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
}
}
Create product command handler is simple it's marshal command data and publish to kafka.
Kafka accepts []byte as value, used proto here, for pass tracing throughout kafka we have to use headers,
in tracing utils you can find helpers for it.
Go having some good libraries for working with kafka, I like segmentio_kafka-go.
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
createDto := &kafkaMessages.ProductCreate{
ProductID: command.CreateDto.ProductID.String(),
Name: command.CreateDto.Name,
Description: command.CreateDto.Description,
Price: command.CreateDto.Price,
}
dtoBytes, err := proto.Marshal(createDto)
if err != nil {
return err
}
return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
Topic: c.cfg.KafkaTopics.ProductCreate.TopicName,
Value: dtoBytes,
Time: time.Now().UTC(),
Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
})
}
For working with kafka nice to have ui clients for debugging, personally like to use conductor
Writer service consumes kafka topics, process messages writing to postgres and publishes successfully processed messages to kafka.
For working with postgres in Go in my opinion the best choose is pgx, but if you need query builder very good library is squirrel,
personally don't like orm's, but usually as have seen, teams often uses gorm, it's up to you.
ProcessMessages method listening kafka topics and call specific method depends on topic:
func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
m, err := r.FetchMessage(ctx)
if err != nil {
s.log.Warnf("workerID: %v, err: %v", workerID, err)
continue
}
s.logProcessMessage(m, workerID)
switch m.Topic {
case s.cfg.KafkaTopics.ProductCreate.TopicName:
s.processCreateProduct(ctx, r, m)
case s.cfg.KafkaTopics.ProductUpdate.TopicName:
s.processUpdateProduct(ctx, r, m)
case s.cfg.KafkaTopics.ProductDelete.TopicName:
s.processDeleteProduct(ctx, r, m)
}
}
}
Kafka message processing method deserialize and validate message body, pass it's to commands and commit,
in this place we have to chose way how we handle errors, but it depends on business logic, as example
we can use dead letter queue pattern.
func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
s.metrics.CreateProductKafkaMessages.Inc()
ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
defer span.Finish()
var msg kafkaMessages.ProductCreate
if err := proto.Unmarshal(m.Value, &msg); err != nil {
s.log.WarnMsg("proto.Unmarshal", err)
s.commitErrMessage(ctx, r, m)
return
}
proUUID, err := uuid.FromString(msg.GetProductID())
if err != nil {
s.log.WarnMsg("proto.Unmarshal", err)
s.commitErrMessage(ctx, r, m)
return
}
command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
if err := s.v.StructCtx(ctx, command); err != nil {
s.log.WarnMsg("validate", err)
s.commitErrMessage(ctx, r, m)
return
}
if err := retry.Do(func() error {
return s.ps.Commands.CreateProduct.Handle(ctx, command)
}, append(retryOptions, retry.Context(ctx))...); err != nil {
s.log.WarnMsg("CreateProduct.Handle", err)
s.metrics.ErrorKafkaMessages.Inc()
return
}
s.commitMessage(ctx, r, m)
}
Writer's service create product command saves data to postgres and publish product saved event to kafka:
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}
product, err := c.pgRepo.CreateProduct(ctx, productDto)
if err != nil {
return err
}
msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
msgBytes, err := proto.Marshal(msg)
if err != nil {
return err
}
message := kafka.Message{
Topic: c.cfg.KafkaTopics.ProductCreated.TopicName,
Value: msgBytes,
Time: time.Now().UTC(),
Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
}
return c.kafkaProducer.PublishMessage(ctx, message)
}
Postgres repository uses pgx, code is simple:
func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
defer span.Finish()
var created models.Product
if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
&created.ProductID,
&created.Name,
&created.Description,
&created.Price,
&created.CreatedAt,
&created.UpdatedAt,
); err != nil {
return nil, errors.Wrap(err, "db.QueryRow")
}
return &created, nil
}
Reader service consumes kafka messages, save to MongoDB and caches by Redis, then project data for retrieving by gRPC calls.
Kafka listener handlers here are looks the same, product created command saves data to MongoDB and cache it in Redis:
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
product := &models.Product{
ProductID: command.ProductID,
Name: command.Name,
Description: command.Description,
Price: command.Price,
CreatedAt: command.CreatedAt,
UpdatedAt: command.UpdatedAt,
}
created, err := c.mongoRepo.CreateProduct(ctx, product)
if err != nil {
return err
}
c.redisRepo.PutProduct(ctx, created.ProductID, created)
return nil
}
MongoDB repository save method:
func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
defer span.Finish()
collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)
_, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "InsertOne")
}
return product, nil
}
And redis caching method is:
func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
defer span.Finish()
productBytes, err := json.Marshal(product)
if err != nil {
r.log.WarnMsg("json.Marshal", err)
return
}
if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
r.log.WarnMsg("redisClient.HSetNX", err)
return
}
r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}
And then api gateway can request reader service for data using gRPC.
Reader gRPC service method:
func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
s.metrics.GetProductByIdGrpcRequests.Inc()
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
defer span.Finish()
productUUID, err := uuid.FromString(req.GetProductID())
if err != nil {
s.log.WarnMsg("uuid.FromString", err)
return nil, s.errResponse(codes.InvalidArgument, err)
}
query := queries.NewGetProductByIdQuery(productUUID)
if err := s.v.StructCtx(ctx, query); err != nil {
s.log.WarnMsg("validate", err)
return nil, s.errResponse(codes.InvalidArgument, err)
}
product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
if err != nil {
s.log.WarnMsg("GetProductById.Handle", err)
return nil, s.errResponse(codes.Internal, err)
}
s.metrics.SuccessGrpcRequests.Inc()
return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}
Get by id method tracing:
Api gateway get product by id http handler method:
// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
return func(c echo.Context) error {
h.metrics.GetProductByIdHttpRequests.Inc()
ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
defer span.Finish()
productUUID, err := uuid.FromString(c.Param(constants.ID))
if err != nil {
h.log.WarnMsg("uuid.FromString", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
query := queries.NewGetProductByIdQuery(productUUID)
response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
if err != nil {
h.log.WarnMsg("GetProductById", err)
h.metrics.ErrorHttpRequests.Inc()
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.metrics.SuccessHttpRequests.Inc()
return c.JSON(http.StatusOK, response)
}
}
and query handler code:
func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
defer span.Finish()
ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
if err != nil {
return nil, err
}
return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}
Search products method:
func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
defer span.Finish()
collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)
filter := bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
}},
}
count, err := collection.CountDocuments(ctx, filter)
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "CountDocuments")
}
if count == 0 {
return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
}
limit := int64(pagination.GetLimit())
skip := int64(pagination.GetOffset())
cursor, err := collection.Find(ctx, filter, &options.FindOptions{
Limit: &limit,
Skip: &skip,
})
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "Find")
}
defer cursor.Close(ctx)
products := make([]*models.Product, 0, pagination.GetSize())
for cursor.Next(ctx) {
var prod models.Product
if err := cursor.Decode(&prod); err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "Find")
}
products = append(products, &prod)
}
if err := cursor.Err(); err != nil {
span.SetTag("error", true)
span.LogKV("error_code", err.Error())
return nil, errors.Wrap(err, "cursor.Err")
}
return models.NewProductListWithPagination(products, count, pagination), nil
}
More details and source code you can find here,
of course in real-world applications, we have to implement many more necessary features,
like circuit breaker, retries, rate limiters, etc., depends on project it can be implemented in different ways,
for example you can use kubernetes and istio for some of them.
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions, feel free contact me by email or any messengers :)
Top comments (3)
hi,
is
really needed? anyway you pass a pointer to the function.
why create another variable when you can write into the passed *models.Product?
Hi, itβs up to you, depends on what you need, in this example, right yes we can reuse pointer like in other methods returning product dto, or may be in other case need return another dto, like in postgres we store id as uuid but want return id as string and etc. only id or only error :)
Hi ,
After call update method by product_id it updates in postgresql but no changes in reader_service .
How does caches in redis retrieve new product info from updated pg db ?