Here is article about create the example of similar to real production mail microservice using RabbitMQ, gRPC, Prometheus, Grafana monitoring and Jaeger opentracing β‘οΈ
Source code GitHub repository
RabbitMQ and Kafka are very often used for asynchronous messaging for inter-service communication. Services communicating by exchanging messages over messaging channels.
Advantages of Message-Based Systems are:
- The sender only needs to know the location of the message broker, not the addresses of all possible receivers.
- Itβs possible to have multiple receivers for a message.
- We can easily add new receivers without any changes in the sender.
- Messages can be queued, ensuring delivery after a receiver has been down.
In short, senders and receivers are decoupled from each other.
When you build a system of microservices, you can end up with many, many services. Coordinating communications between all these services can be tricky.
RabbitMQ can act as a central broker and services can just subscribe to the type of messages that they need.
First run all necessary docker containers:
run make local
UI interfaces will be available on ports:
RabbitMQ UI: http://localhost:15672
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3000
The RabbitMQ management plugin UI available on http://localhost:15672 default login/password is guest/guest:
This example docker-compose.local.yml file contains all necessary images.
version: "3.8"
services:
postgesql:
image: postgres:13-alpine
container_name: auth_postgesql
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=mails_db
volumes:
- ./pgdata:/var/lib/postgresql/data
networks:
- microservice_network
rabbitmq:
# There is a prebuilt RabbitMQ image; see
# https://hub.docker.com/_/rabbitmq/ for details.
# This variant is built on Alpine Linux (it's smaller) and includes
# the management UI.
image: 'rabbitmq:3.6-management-alpine'
# These ports are exposed on the host; 'hostport:containerport'.
# You could connect to this server from outside with the *host's*
# DNS name or IP address and port 5672 (the left-hand side of the
# colon).
ports:
# The standard AMQP protocol port
- '5672:5672'
# HTTP management UI
- '15672:15672'
# Run this container on a private network for this application.
# This is necessary for magic Docker DNS to work: other containers
# also running on this network will see a host name "rabbitmq"
# (the name of this section) and the internal port 5672, even though
# that's not explicitly published above.
networks:
- microservice_network
prometheus:
container_name: prometheus_container
image: prom/prometheus
volumes:
- ./docker/monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention=20d'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- '9090:9090'
networks:
- microservice_network
node_exporter:
container_name: node_exporter_container
image: prom/node-exporter
ports:
- '9101:9100'
networks:
- microservice_network
grafana:
container_name: grafana_container
image: grafana/grafana
ports:
- '3000:3000'
networks:
- microservice_network
networks:
microservice_network:
driver: bridge
As soon as we run containers, Prometheus and Grafana UI is available too, and after sending any requests, you are able to monitoring of metrics at the dashboard:
Grafana default login/password is admin and password admin
RabbitMQ have Go client library and good documentation with nice tutorial
Dial connection:
// Initialize new RabbitMQ connection
func NewRabbitMQConn(cfg *config.Config) (*amqp.Connection, error) {
connAddr := fmt.Sprintf(
"amqp://%s:%s@%s:%s/",
cfg.RabbitMQ.User,
cfg.RabbitMQ.Password,
cfg.RabbitMQ.Host,
cfg.RabbitMQ.Port,
)
return amqp.Dial(connAddr)
}
Let's create our producer and consumer, pass amqpConn to the constructor
// Images Rabbitmq consumer
type EmailsConsumer struct {
amqpConn *amqp.Connection
logger logger.Logger
emailUC email.EmailsUseCase
}
Create new channel method, amqpConn.Channel opens a unique, concurrent server channel.
Next we need declare Exchange and Queue, then bind them using bindingKey Routing key,
The ch.Qos method allows us prefetch messages. With a prefetch count greater than zero, the server will deliver that many
messages to consumers before acknowledgments are received.
// Consume messages
func (c *EmailsConsumer) CreateChannel(exchangeName, queueName, bindingKey, consumerTag string) (*amqp.Channel, error) {
ch, err := c.amqpConn.Channel()
if err != nil {
return nil, errors.Wrap(err, "Error amqpConn.Channel")
}
c.logger.Infof("Declaring exchange: %s", exchangeName)
err = ch.ExchangeDeclare(
exchangeName,
exchangeKind,
exchangeDurable,
exchangeAutoDelete,
exchangeInternal,
exchangeNoWait,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "Error ch.ExchangeDeclare")
}
queue, err := ch.QueueDeclare(
queueName,
queueDurable,
queueAutoDelete,
queueExclusive,
queueNoWait,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "Error ch.QueueDeclare")
}
c.logger.Infof("Declared queue, binding it to exchange: Queue: %v, messagesCount: %v, "+
"consumerCount: %v, exchange: %v, bindingKey: %v",
queue.Name,
queue.Messages,
queue.Consumers,
exchangeName,
bindingKey,
)
err = ch.QueueBind(
queue.Name,
bindingKey,
exchangeName,
queueNoWait,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "Error ch.QueueBind")
}
c.logger.Infof("Queue bound to exchange, starting to consume from queue, consumerTag: %v", consumerTag)
err = ch.Qos(
prefetchCount, // prefetch count
prefetchSize, // prefetch size
prefetchGlobal, // global
)
if err != nil {
return nil, errors.Wrap(err, "Error ch.Qos")
}
return ch, nil
}
StartConsumer method accept exchange, queue params and Worker Pools size
// Start new rabbitmq consumer
func (c *EmailsConsumer) StartConsumer(workerPoolSize int, exchange, queueName, bindingKey, consumerTag string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, err := c.CreateChannel(exchange, queueName, bindingKey, consumerTag)
if err != nil {
return errors.Wrap(err, "CreateChannel")
}
defer ch.Close()
deliveries, err := ch.Consume(
queueName,
consumerTag,
consumeAutoAck,
consumeExclusive,
consumeNoLocal,
consumeNoWait,
nil,
)
if err != nil {
return errors.Wrap(err, "Consume")
}
for i := 0; i < workerPoolSize; i++ {
go c.worker(ctx, deliveries)
}
chanErr := <-ch.NotifyClose(make(chan *amqp.Error))
c.logger.Errorf("ch.NotifyClose: %v", chanErr)
return chanErr
}
Very popular phrase in Go: βDo not communicate by sharing memory; instead, share memory by communicating.β
Run workers for the number of concurrency what we need:
for i := 0; i < workerPoolSize; i++ {
go c.worker(ctx, deliveries)
}
Each worker reads from jobs channel and produces a job:
All deliveries in AMQP must be acknowledged.
If you called Channel.Consumewith autoAck true then the server will be automatically ack each message and
this method should not be called. Otherwise, you must call Delivery.Ack after
you have successfully processed this delivery.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
Reject delegates a negatively acknowledgement through the Acknowledger interface.
func (c *EmailsConsumer) worker(ctx context.Context, messages <-chan amqp.Delivery) {
for delivery := range messages {
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsConsumer.worker")
c.logger.Infof("processDeliveries deliveryTag% v", delivery.DeliveryTag)
incomingMessages.Inc()
err := c.emailUC.SendEmail(ctx, delivery.Body)
if err != nil {
if err := delivery.Reject(false); err != nil {
c.logger.Errorf("Err delivery.Reject: %v", err)
}
c.logger.Errorf("Failed to process delivery: %v", err)
errorMessages.Inc()
} else {
err = delivery.Ack(false)
if err != nil {
c.logger.Errorf("Failed to acknowledge delivery: %v", err)
}
successMessages.Inc()
}
span.Finish()
}
c.logger.Info("Deliveries channel closed")
}
Methods incomingMessages.Inc(),successMessages.Inc(),errorMessages.Inc() increase Prometheus metrics counters which we created earlier:
var (
incomingMessages = promauto.NewCounter(prometheus.CounterOpts{
Name: "emails_incoming_rabbitmq_messages_total",
Help: "The total number of incoming RabbitMQ messages",
})
successMessages = promauto.NewCounter(prometheus.CounterOpts{
Name: "emails_success_incoming_rabbitmq_messages_total",
Help: "The total number of success incoming success RabbitMQ messages",
})
errorMessages = promauto.NewCounter(prometheus.CounterOpts{
Name: "emails_error_incoming_rabbitmq_message_total",
Help: "The total number of error incoming success RabbitMQ messages",
})
)
In worker we call usecase method SendEmail, here we serialize and sanitize delivery body and then send email using an SMTP server
func (e *EmailUseCase) SendEmail(ctx context.Context, deliveryBody []byte) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailUseCase.SendEmail")
defer span.Finish()
mail := &models.Email{}
if err := json.Unmarshal(deliveryBody, mail); err != nil {
return errors.Wrap(err, "json.Unmarshal")
}
mail.Body = utils.SanitizeString(mail.Body)
mail.From = e.cfg.Smtp.User
if err := utils.ValidateStruct(ctx, mail); err != nil {
return errors.Wrap(err, "ValidateStruct")
}
if err := e.mailer.Send(ctx, mail); err != nil {
return errors.Wrap(err, "mailer.Send")
}
createdEmail, err := e.emailsRepo.CreateEmail(ctx, mail)
if err != nil {
return errors.Wrap(err, "emailsRepo.CreateEmail")
}
span.LogFields(log.String("emailID", createdEmail.EmailID.String()))
e.logger.Infof("Success send email: %v", createdEmail.EmailID)
return nil
}
I use Gomail for send emails, of course some SMTP server is required, but you can google many free variants for testing. Send method is very simple:
// Send email
func (m *Mailer) Send(ctx context.Context, email *models.Email) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Mailer.Send")
defer span.Finish()
gm := gomail.NewMessage()
gm.SetHeader("From", email.From)
gm.SetHeader("To", email.To...)
gm.SetHeader("Subject", email.Subject)
gm.SetBody(email.ContentType, email.Body)
return m.mailDialer.DialAndSend(gm)
}
After successfully send email let's save it in postgres, for sql db is good combination of sqlx and pgx.
// Create email
func (e *EmailsRepository) CreateEmail(ctx context.Context, email *models.Email) (*models.Email, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsRepository.CreateEmail")
defer span.Finish()
var id uuid.UUID
if err := e.db.QueryRowContext(
ctx,
createEmailQuery,
email.GetToString(),
email.From,
email.Subject,
email.Body,
email.ContentType,
).Scan(&id); err != nil {
return nil, errors.Wrap(err, "db.QueryRowxContext")
}
email.EmailID = id
return email, nil
}
As you can see on every layer first we create tracing span
Open http://localhost:16686/ for check how it's look in Jaeger:
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsRepository.CreateEmail")
defer span.Finish()
Our consumer is done, next create publisher which has very simple Publish method:
// Publish message
func (p *EmailsPublisher) Publish(body []byte, contentType string) error {
p.logger.Infof("Publishing message Exchange: %s, RoutingKey: %s", p.cfg.RabbitMQ.Exchange, p.cfg.RabbitMQ.RoutingKey)
if err := p.amqpChan.Publish(
p.cfg.RabbitMQ.Exchange,
p.cfg.RabbitMQ.RoutingKey,
publishMandatory,
publishImmediate,
amqp.Publishing{
ContentType: contentType,
DeliveryMode: amqp.Persistent,
MessageId: uuid.New().String(),
Timestamp: time.Now(),
Body: body,
},
); err != nil {
return errors.Wrap(err, "ch.Publish")
}
publishedMessages.Inc()
return nil
}
Our microservice works with RabbitMQ and communicate with other services by gRPC, so let's create proto file, it's has 3 methods:
service EmailService{
rpc SendEmails(SendEmailRequest) returns (SendEmailResponse);
rpc FindEmailById(FindEmailByIdRequest) returns (FindEmailByIdResponse);
rpc FindEmailsByReceiver(FindEmailsByReceiverRequest) returns (FindEmailsByReceiverResponse);
}
Generate proto file:
protoc --go_out=plugins=grpc:. *.proto
And initialize our gRPC service, here i used gRPC Middleware repository for unary interceptors:
l, err := net.Listen("tcp", s.cfg.Server.Port)
if err != nil {
return err
}
defer l.Close()
server := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: s.cfg.Server.MaxConnectionIdle * time.Minute,
Timeout: s.cfg.Server.Timeout * time.Second,
MaxConnectionAge: s.cfg.Server.MaxConnectionAge * time.Minute,
Time: s.cfg.Server.Timeout * time.Minute,
}),
grpc.UnaryInterceptor(im.Logger),
grpc.ChainUnaryInterceptor(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpcrecovery.UnaryServerInterceptor(),
),
)
emailGrpcMicroservice := mailGrpc.NewEmailMicroservice(emailUseCase, s.logger, s.cfg)
emailService.RegisterEmailServiceServer(server, emailGrpcMicroservice)
grpc_prometheus.Register(server)
s.logger.Info("Emails Service initialized")
For testing gRPC we can use evans and need add reflection:
reflection.Register(server)
Our gRPC server must implement service interface from email.pb.go, so let's implement methods
Unary FindEmailsByReceiver handler have ctx with metadata and request data
First we start tracing span then prepare and pass input params to usecase method.
// Find emails by receiver address
func (e *EmailMicroservice) FindEmailsByReceiver(ctx context.Context, r *emailService.FindEmailsByReceiverRequest) (*emailService.FindEmailsByReceiverResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailUseCase.FindEmailById")
defer span.Finish()
paginationQuery := &utils.PaginationQuery{
Size: r.GetSize(),
Page: r.GetPage(),
}
emails, err := e.emailUC.FindEmailsByReceiver(ctx, r.GetReceiverEmail(), paginationQuery)
if err != nil {
e.logger.Errorf("emailUC.FindEmailsByReceiver: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "emailUC.FindEmailsByReceiver: %v", err)
}
return &emailService.FindEmailsByReceiverResponse{
Emails: e.convertEmailsListToProto(emails.Emails),
TotalPages: emails.TotalPages,
TotalCount: emails.TotalCount,
HasMore: emails.HasMore,
Page: emails.Page,
Size: emails.Size,
}, nil
}
FindEmailsByReceiver usecase method is very simple:
// Find emails by receiver
func (e *EmailUseCase) FindEmailsByReceiver(ctx context.Context, to string, query *utils.PaginationQuery) (*models.EmailsList, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailUseCase.FindEmailsByReceiver")
defer span.Finish()
return e.emailsRepo.FindEmailsByReceiver(ctx, to, query)
}
And the repository layer: we again create the tracing span and first make query for count existing rows, if no rows our service return result with the empty slice
Important here is dont forget call rows.Close()
Forgetting to close the rows variable means leaking connections. Combined with growing load on the server, this likely means running into max_connections errors or similar.
// Find emails by receiver
func (e *EmailsRepository) FindEmailsByReceiver(ctx context.Context, to string, query *utils.PaginationQuery) (list *models.EmailsList, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsRepository.FindEmailsByReceiver")
defer span.Finish()
var totalCount uint64
if err := e.db.QueryRowContext(ctx, totalCountQuery, to).Scan(&totalCount); err != nil {
return nil, errors.Wrap(err, "db.QueryRowContext")
}
if totalCount == 0 {
return &models.EmailsList{Emails: []*models.Email{}}, nil
}
rows, err := e.db.QueryxContext(ctx, findEmailByReceiverQuery, to, query.GetOffset(), query.GetLimit())
if err != nil {
return nil, errors.Wrap(err, "db.QueryxContext")
}
defer func() {
if closeErr := rows.Close(); closeErr != nil {
err = errors.Wrap(closeErr, "rows.Close")
}
}()
if err := rows.Err(); err != nil {
return nil, errors.Wrap(err, "rows.Err")
}
emails := make([]*models.Email, 0, query.GetSize())
for rows.Next() {
var mailTo string
email := &models.Email{}
if err := rows.Scan(
&email.EmailID,
&mailTo,
&email.From,
&email.Subject,
&email.Body,
&email.ContentType,
&email.CreatedAt,
); err != nil {
return nil, errors.Wrap(err, "rows.Scan")
}
email.SetToFromString(mailTo)
emails = append(emails, email)
}
return &models.EmailsList{
TotalCount: totalCount,
TotalPages: utils.GetTotalPages(totalCount, query.GetSize()),
Page: query.GetPage(),
Size: query.GetSize(),
HasMore: utils.GetHasMore(query.GetPage(), totalCount, query.GetSize()),
Emails: emails,
}, err
}
repository return result with pagination info, all utils and helpers you can find in source code:
return &models.EmailsList{
TotalCount: totalCount,
TotalPages: utils.GetTotalPages(totalCount, query.GetSize()),
Page: query.GetPage(),
Size: query.GetSize(),
HasMore: utils.GetHasMore(query.GetPage(), totalCount, query.GetSize()),
Emails: emails,
}, err
Let's test it with evans:
Code for another gRPC methods and some tests you can find in th repository.
For testing and mocking i can recomend testify and gomock.
Repository with the source code and list of all used tools u can find here π¨βπ» :)
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)
Top comments (1)
Great article. Can you explain why there is a function called SetupExchangeAndQueue that you never called? What was the porpuse of it?