DEV Community

Cover image for Go, RabbitMQ and gRPC Clean Architecture microservice πŸ’«πŸ‘‹
Alexander
Alexander

Posted on

Go, RabbitMQ and gRPC Clean Architecture microservice πŸ’«πŸ‘‹

Here is article about create the example of similar to real production mail microservice using RabbitMQ, gRPC, Prometheus, Grafana monitoring and Jaeger opentracing ⚑️

Source code GitHub repository

RabbitMQ and Kafka are very often used for asynchronous messaging for inter-service communication. Services communicating by exchanging messages over messaging channels.

Advantages of Message-Based Systems are:

  • The sender only needs to know the location of the message broker, not the addresses of all possible receivers.
  • It’s possible to have multiple receivers for a message.
  • We can easily add new receivers without any changes in the sender.
  • Messages can be queued, ensuring delivery after a receiver has been down.

In short, senders and receivers are decoupled from each other.
When you build a system of microservices, you can end up with many, many services. Coordinating communications between all these services can be tricky.
RabbitMQ can act as a central broker and services can just subscribe to the type of messages that they need.

First run all necessary docker containers:

run make local
Enter fullscreen mode Exit fullscreen mode

UI interfaces will be available on ports:

RabbitMQ UI: http://localhost:15672

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Grafana UI: http://localhost:3000

The RabbitMQ management plugin UI available on http://localhost:15672 default login/password is guest/guest:

This example docker-compose.local.yml file contains all necessary images.

version: "3.8"

services:
  postgesql:
    image: postgres:13-alpine
    container_name: auth_postgesql
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=mails_db
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    networks:
      - microservice_network

  rabbitmq:
    # There is a prebuilt RabbitMQ image; see
    # https://hub.docker.com/_/rabbitmq/ for details.
    # This variant is built on Alpine Linux (it's smaller) and includes
    # the management UI.
    image: 'rabbitmq:3.6-management-alpine'

    # These ports are exposed on the host; 'hostport:containerport'.
    # You could connect to this server from outside with the *host's*
    # DNS name or IP address and port 5672 (the left-hand side of the
    # colon).
    ports:
      # The standard AMQP protocol port
      - '5672:5672'
      # HTTP management UI
      - '15672:15672'

    # Run this container on a private network for this application.
    # This is necessary for magic Docker DNS to work: other containers
    # also running on this network will see a host name "rabbitmq"
    # (the name of this section) and the internal port 5672, even though
    # that's not explicitly published above.
    networks:
      - microservice_network

  prometheus:
    container_name: prometheus_container
    image: prom/prometheus
    volumes:
      - ./docker/monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--storage.tsdb.retention=20d'
      - '--web.console.libraries=/usr/share/prometheus/console_libraries'
      - '--web.console.templates=/usr/share/prometheus/consoles'
    ports:
      - '9090:9090'
    networks:
      - microservice_network

  node_exporter:
    container_name: node_exporter_container
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks:
      - microservice_network

  grafana:
    container_name: grafana_container
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks:
      - microservice_network

networks:
  microservice_network:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

As soon as we run containers, Prometheus and Grafana UI is available too, and after sending any requests, you are able to monitoring of metrics at the dashboard:

Grafana default login/password is admin and password admin


RabbitMQ have Go client library and good documentation with nice tutorial

Dial connection:

// Initialize new RabbitMQ connection
func NewRabbitMQConn(cfg *config.Config) (*amqp.Connection, error) {
    connAddr := fmt.Sprintf(
        "amqp://%s:%s@%s:%s/",
        cfg.RabbitMQ.User,
        cfg.RabbitMQ.Password,
        cfg.RabbitMQ.Host,
        cfg.RabbitMQ.Port,
    )
    return amqp.Dial(connAddr)
}
Enter fullscreen mode Exit fullscreen mode

Let's create our producer and consumer, pass amqpConn to the constructor

// Images Rabbitmq consumer
type EmailsConsumer struct {
    amqpConn *amqp.Connection
    logger   logger.Logger
    emailUC  email.EmailsUseCase
}
Enter fullscreen mode Exit fullscreen mode

Create new channel method, amqpConn.Channel opens a unique, concurrent server channel.
Next we need declare Exchange and Queue, then bind them using bindingKey Routing key,
The ch.Qos method allows us prefetch messages. With a prefetch count greater than zero, the server will deliver that many
messages to consumers before acknowledgments are received.

// Consume messages
func (c *EmailsConsumer) CreateChannel(exchangeName, queueName, bindingKey, consumerTag string) (*amqp.Channel, error) {
    ch, err := c.amqpConn.Channel()
    if err != nil {
        return nil, errors.Wrap(err, "Error amqpConn.Channel")
    }

    c.logger.Infof("Declaring exchange: %s", exchangeName)
    err = ch.ExchangeDeclare(
        exchangeName,
        exchangeKind,
        exchangeDurable,
        exchangeAutoDelete,
        exchangeInternal,
        exchangeNoWait,
        nil,
    )
    if err != nil {
        return nil, errors.Wrap(err, "Error ch.ExchangeDeclare")
    }

    queue, err := ch.QueueDeclare(
        queueName,
        queueDurable,
        queueAutoDelete,
        queueExclusive,
        queueNoWait,
        nil,
    )
    if err != nil {
        return nil, errors.Wrap(err, "Error ch.QueueDeclare")
    }

    c.logger.Infof("Declared queue, binding it to exchange: Queue: %v, messagesCount: %v, "+
        "consumerCount: %v, exchange: %v, bindingKey: %v",
        queue.Name,
        queue.Messages,
        queue.Consumers,
        exchangeName,
        bindingKey,
    )

    err = ch.QueueBind(
        queue.Name,
        bindingKey,
        exchangeName,
        queueNoWait,
        nil,
    )
    if err != nil {
        return nil, errors.Wrap(err, "Error ch.QueueBind")
    }

    c.logger.Infof("Queue bound to exchange, starting to consume from queue, consumerTag: %v", consumerTag)

    err = ch.Qos(
        prefetchCount,  // prefetch count
        prefetchSize,   // prefetch size
        prefetchGlobal, // global
    )
    if err != nil {
        return nil, errors.Wrap(err, "Error  ch.Qos")
    }

    return ch, nil
}

Enter fullscreen mode Exit fullscreen mode

StartConsumer method accept exchange, queue params and Worker Pools size

// Start new rabbitmq consumer
func (c *EmailsConsumer) StartConsumer(workerPoolSize int, exchange, queueName, bindingKey, consumerTag string) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    ch, err := c.CreateChannel(exchange, queueName, bindingKey, consumerTag)
    if err != nil {
        return errors.Wrap(err, "CreateChannel")
    }
    defer ch.Close()

    deliveries, err := ch.Consume(
        queueName,
        consumerTag,
        consumeAutoAck,
        consumeExclusive,
        consumeNoLocal,
        consumeNoWait,
        nil,
    )
    if err != nil {
        return errors.Wrap(err, "Consume")
    }

    for i := 0; i < workerPoolSize; i++ {
        go c.worker(ctx, deliveries)
    }

    chanErr := <-ch.NotifyClose(make(chan *amqp.Error))
    c.logger.Errorf("ch.NotifyClose: %v", chanErr)
    return chanErr
}
Enter fullscreen mode Exit fullscreen mode

Very popular phrase in Go: β€œDo not communicate by sharing memory; instead, share memory by communicating.”

Run workers for the number of concurrency what we need:

    for i := 0; i < workerPoolSize; i++ {
        go c.worker(ctx, deliveries)
    }
Enter fullscreen mode Exit fullscreen mode

Each worker reads from jobs channel and produces a job:

All deliveries in AMQP must be acknowledged.

If you called Channel.Consumewith autoAck true then the server will be automatically ack each message and
this method should not be called. Otherwise, you must call Delivery.Ack after
you have successfully processed this delivery.

Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.

Reject delegates a negatively acknowledgement through the Acknowledger interface.

func (c *EmailsConsumer) worker(ctx context.Context, messages <-chan amqp.Delivery) {

    for delivery := range messages {
        span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsConsumer.worker")

        c.logger.Infof("processDeliveries deliveryTag% v", delivery.DeliveryTag)

        incomingMessages.Inc()

        err := c.emailUC.SendEmail(ctx, delivery.Body)
        if err != nil {
            if err := delivery.Reject(false); err != nil {
                c.logger.Errorf("Err delivery.Reject: %v", err)
            }
            c.logger.Errorf("Failed to process delivery: %v", err)
            errorMessages.Inc()
        } else {
            err = delivery.Ack(false)
            if err != nil {
                c.logger.Errorf("Failed to acknowledge delivery: %v", err)
            }
            successMessages.Inc()
        }
        span.Finish()
    }

    c.logger.Info("Deliveries channel closed")
}
Enter fullscreen mode Exit fullscreen mode

Methods incomingMessages.Inc(),successMessages.Inc(),errorMessages.Inc() increase Prometheus metrics counters which we created earlier:

var (
    incomingMessages = promauto.NewCounter(prometheus.CounterOpts{
        Name: "emails_incoming_rabbitmq_messages_total",
        Help: "The total number of incoming RabbitMQ messages",
    })
    successMessages = promauto.NewCounter(prometheus.CounterOpts{
        Name: "emails_success_incoming_rabbitmq_messages_total",
        Help: "The total number of success incoming success RabbitMQ messages",
    })
    errorMessages = promauto.NewCounter(prometheus.CounterOpts{
        Name: "emails_error_incoming_rabbitmq_message_total",
        Help: "The total number of error incoming success RabbitMQ messages",
    })
)
Enter fullscreen mode Exit fullscreen mode

In worker we call usecase method SendEmail, here we serialize and sanitize delivery body and then send email using an SMTP server

func (e *EmailUseCase) SendEmail(ctx context.Context, deliveryBody []byte) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "EmailUseCase.SendEmail")
    defer span.Finish()

    mail := &models.Email{}
    if err := json.Unmarshal(deliveryBody, mail); err != nil {
        return errors.Wrap(err, "json.Unmarshal")
    }

    mail.Body = utils.SanitizeString(mail.Body)

    mail.From = e.cfg.Smtp.User
    if err := utils.ValidateStruct(ctx, mail); err != nil {
        return errors.Wrap(err, "ValidateStruct")
    }

    if err := e.mailer.Send(ctx, mail); err != nil {
        return errors.Wrap(err, "mailer.Send")
    }

    createdEmail, err := e.emailsRepo.CreateEmail(ctx, mail)
    if err != nil {
        return errors.Wrap(err, "emailsRepo.CreateEmail")
    }

    span.LogFields(log.String("emailID", createdEmail.EmailID.String()))
    e.logger.Infof("Success send email: %v", createdEmail.EmailID)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

I use Gomail for send emails, of course some SMTP server is required, but you can google many free variants for testing. Send method is very simple:

// Send email
func (m *Mailer) Send(ctx context.Context, email *models.Email) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "Mailer.Send")
    defer span.Finish()

    gm := gomail.NewMessage()
    gm.SetHeader("From", email.From)
    gm.SetHeader("To", email.To...)
    gm.SetHeader("Subject", email.Subject)
    gm.SetBody(email.ContentType, email.Body)

    return m.mailDialer.DialAndSend(gm)
}
Enter fullscreen mode Exit fullscreen mode

After successfully send email let's save it in postgres, for sql db is good combination of sqlx and pgx.

// Create email
func (e *EmailsRepository) CreateEmail(ctx context.Context, email *models.Email) (*models.Email, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsRepository.CreateEmail")
    defer span.Finish()

    var id uuid.UUID
    if err := e.db.QueryRowContext(
        ctx,
        createEmailQuery,
        email.GetToString(),
        email.From,
        email.Subject,
        email.Body,
        email.ContentType,
    ).Scan(&id); err != nil {
        return nil, errors.Wrap(err, "db.QueryRowxContext")
    }

    email.EmailID = id
    return email, nil
}
Enter fullscreen mode Exit fullscreen mode

As you can see on every layer first we create tracing span

Open http://localhost:16686/ for check how it's look in Jaeger:


    span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsRepository.CreateEmail")
    defer span.Finish()
Enter fullscreen mode Exit fullscreen mode

Our consumer is done, next create publisher which has very simple Publish method:

// Publish message
func (p *EmailsPublisher) Publish(body []byte, contentType string) error {

    p.logger.Infof("Publishing message Exchange: %s, RoutingKey: %s", p.cfg.RabbitMQ.Exchange, p.cfg.RabbitMQ.RoutingKey)

    if err := p.amqpChan.Publish(
        p.cfg.RabbitMQ.Exchange,
        p.cfg.RabbitMQ.RoutingKey,
        publishMandatory,
        publishImmediate,
        amqp.Publishing{
            ContentType:  contentType,
            DeliveryMode: amqp.Persistent,
            MessageId:    uuid.New().String(),
            Timestamp:    time.Now(),
            Body:         body,
        },
    ); err != nil {
        return errors.Wrap(err, "ch.Publish")
    }

    publishedMessages.Inc()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Our microservice works with RabbitMQ and communicate with other services by gRPC, so let's create proto file, it's has 3 methods:

service EmailService{
  rpc SendEmails(SendEmailRequest) returns (SendEmailResponse);
  rpc FindEmailById(FindEmailByIdRequest) returns (FindEmailByIdResponse);
  rpc FindEmailsByReceiver(FindEmailsByReceiverRequest) returns (FindEmailsByReceiverResponse);
}
Enter fullscreen mode Exit fullscreen mode

Generate proto file:

protoc --go_out=plugins=grpc:. *.proto
Enter fullscreen mode Exit fullscreen mode

And initialize our gRPC service, here i used gRPC Middleware repository for unary interceptors:

l, err := net.Listen("tcp", s.cfg.Server.Port)
    if err != nil {
        return err
    }
    defer l.Close()

    server := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
        MaxConnectionIdle: s.cfg.Server.MaxConnectionIdle * time.Minute,
        Timeout:           s.cfg.Server.Timeout * time.Second,
        MaxConnectionAge:  s.cfg.Server.MaxConnectionAge * time.Minute,
        Time:              s.cfg.Server.Timeout * time.Minute,
    }),
        grpc.UnaryInterceptor(im.Logger),
        grpc.ChainUnaryInterceptor(
            grpc_ctxtags.UnaryServerInterceptor(),
            grpc_prometheus.UnaryServerInterceptor,
            grpcrecovery.UnaryServerInterceptor(),
        ),
    )

    emailGrpcMicroservice := mailGrpc.NewEmailMicroservice(emailUseCase, s.logger, s.cfg)
    emailService.RegisterEmailServiceServer(server, emailGrpcMicroservice)
    grpc_prometheus.Register(server)
    s.logger.Info("Emails Service initialized")
Enter fullscreen mode Exit fullscreen mode

For testing gRPC we can use evans and need add reflection:

reflection.Register(server)
Enter fullscreen mode Exit fullscreen mode

Our gRPC server must implement service interface from email.pb.go, so let's implement methods

Unary FindEmailsByReceiver handler have ctx with metadata and request data

First we start tracing span then prepare and pass input params to usecase method.

// Find emails by receiver address
func (e *EmailMicroservice) FindEmailsByReceiver(ctx context.Context, r *emailService.FindEmailsByReceiverRequest) (*emailService.FindEmailsByReceiverResponse, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "EmailUseCase.FindEmailById")
    defer span.Finish()

    paginationQuery := &utils.PaginationQuery{
        Size: r.GetSize(),
        Page: r.GetPage(),
    }

    emails, err := e.emailUC.FindEmailsByReceiver(ctx, r.GetReceiverEmail(), paginationQuery)
    if err != nil {
        e.logger.Errorf("emailUC.FindEmailsByReceiver: %v", err)
        return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "emailUC.FindEmailsByReceiver: %v", err)
    }

    return &emailService.FindEmailsByReceiverResponse{
        Emails:     e.convertEmailsListToProto(emails.Emails),
        TotalPages: emails.TotalPages,
        TotalCount: emails.TotalCount,
        HasMore:    emails.HasMore,
        Page:       emails.Page,
        Size:       emails.Size,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

FindEmailsByReceiver usecase method is very simple:

// Find emails by receiver
func (e *EmailUseCase) FindEmailsByReceiver(ctx context.Context, to string, query *utils.PaginationQuery) (*models.EmailsList, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "EmailUseCase.FindEmailsByReceiver")
    defer span.Finish()

    return e.emailsRepo.FindEmailsByReceiver(ctx, to, query)
}
Enter fullscreen mode Exit fullscreen mode

And the repository layer: we again create the tracing span and first make query for count existing rows, if no rows our service return result with the empty slice

Important here is dont forget call rows.Close()

Forgetting to close the rows variable means leaking connections. Combined with growing load on the server, this likely means running into max_connections errors or similar.

// Find emails by receiver
func (e *EmailsRepository) FindEmailsByReceiver(ctx context.Context, to string, query *utils.PaginationQuery) (list *models.EmailsList, err error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "EmailsRepository.FindEmailsByReceiver")
    defer span.Finish()

    var totalCount uint64
    if err := e.db.QueryRowContext(ctx, totalCountQuery, to).Scan(&totalCount); err != nil {
        return nil, errors.Wrap(err, "db.QueryRowContext")
    }
    if totalCount == 0 {
        return &models.EmailsList{Emails: []*models.Email{}}, nil
    }

    rows, err := e.db.QueryxContext(ctx, findEmailByReceiverQuery, to, query.GetOffset(), query.GetLimit())
    if err != nil {
        return nil, errors.Wrap(err, "db.QueryxContext")
    }
    defer func() {
        if closeErr := rows.Close(); closeErr != nil {
            err = errors.Wrap(closeErr, "rows.Close")
        }
    }()

    if err := rows.Err(); err != nil {
        return nil, errors.Wrap(err, "rows.Err")
    }

    emails := make([]*models.Email, 0, query.GetSize())
    for rows.Next() {
        var mailTo string
        email := &models.Email{}
        if err := rows.Scan(
            &email.EmailID,
            &mailTo,
            &email.From,
            &email.Subject,
            &email.Body,
            &email.ContentType,
            &email.CreatedAt,
        ); err != nil {
            return nil, errors.Wrap(err, "rows.Scan")
        }
        email.SetToFromString(mailTo)
        emails = append(emails, email)
    }

    return &models.EmailsList{
        TotalCount: totalCount,
        TotalPages: utils.GetTotalPages(totalCount, query.GetSize()),
        Page:       query.GetPage(),
        Size:       query.GetSize(),
        HasMore:    utils.GetHasMore(query.GetPage(), totalCount, query.GetSize()),
        Emails:     emails,
    }, err
}

Enter fullscreen mode Exit fullscreen mode

repository return result with pagination info, all utils and helpers you can find in source code:

return &models.EmailsList{
        TotalCount: totalCount,
        TotalPages: utils.GetTotalPages(totalCount, query.GetSize()),
        Page:       query.GetPage(),
        Size:       query.GetSize(),
        HasMore:    utils.GetHasMore(query.GetPage(), totalCount, query.GetSize()),
        Emails:     emails,
    }, err
Enter fullscreen mode Exit fullscreen mode

Let's test it with evans:

Code for another gRPC methods and some tests you can find in th repository.

For testing and mocking i can recomend testify and gomock.

Repository with the source code and list of all used tools u can find here πŸ‘¨β€πŸ’» :)
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)

Top comments (1)

Collapse
 
geekgopher profile image
Oscar Llamas • Edited

Great article. Can you explain why there is a function called SetupExchangeAndQueue that you never called? What was the porpuse of it?