DEV Community

Cover image for Go, NATS, gRPC and PostgreSQL clean architecture microservice with monitoring and tracing πŸ‘‹
Alexander
Alexander

Posted on • Updated on

Go, NATS, gRPC and PostgreSQL clean architecture microservice with monitoring and tracing πŸ‘‹

Hi, in this article I've tried to make microservice using: πŸš€
NATS as message broker
gRPC Go implementation of gRPC
PostgreSQL as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
MailHog Web and API based SMTP testing
Redis Type-safe Redis client for Golang
Nginx HTTP and reverse proxy server
swag Swagger for Go
migrate for migrations

Source code you can find in GitHub repository

In this article want to make a main accent on NATS Streaming which is written in the Go 🀩.

As we can see in documentation it's have nice features:

  • Message/event persistence NATS Streaming offers configurable message persistence: in-memory, flat files or database.
  • At-least-once-delivery message acknowledgements between publisher and server (for publish operations) and between subscriber and server (to confirm message delivery). Messages are persisted by the server in memory or secondary storage (or other external storage) and will be redelivered to eligible subscribing clients as needed.
  • Publisher rate limiting MaxPubAcksInFlight option is effectively limits the number of unacknowledged messages that a publisher may have in-flight at any given time. When this maximum is reached, further async publish calls will block until the number of unacknowledged messages falls below the specified limit.
  • Rate matching/limiting per subscriber MaxInFlight option is edesignates the maximum number of outstanding acknowledgements (messages that have been delivered but not acknowledged) that NATS Streaming will allow for a given subscription. When this limit is reached, NATS Streaming will suspend delivery of messages to this subscription until the number of unacknowledged messages falls below the specified limit.
  • Durable subscriptions specify a "durable name" which will survive client restarts.
  • Historical message replay by subject New subscriptions may specify a start position in the stream of messages stored for the subscribed subject's channel.By using this option, message delivery may begin at:
    • The earliest message stored for this subject
    • The most recently stored message for this subject, prior to the start of the current subscription. This is commonly thought of as "last value" or "initial value" caching.
    • A specific date/time in nanoseconds
    • An historical offset from the current server date/time, e.g. the last 30 seconds.
    • A specific message sequence number

For this example i didn't implement any interesting business logic and didn't cover tests,
the microservice send emails and save it's to PostgreSQL, can communicate by NATS, gRPC and REST.
NATS subscribers handle create email events, REST and gRPC used for quering data.

For local development:



make cert // generates tls certificates
make migrate_up // run sql migrations
make swagger // generate swagger documentation
make local or develop // for run docker compose files


Enter fullscreen mode Exit fullscreen mode

For run all in the docker you can run make develop it has hot reloading feature.

UI interfaces will be available on ports:

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Grafana UI: http://localhost:3000

NATS UI: http://localhost:8222/

MailHog: http://localhost:8025/

Swagger UI by default will run on: https://localhost:5000/swagger/index.html



In Grafana you need to chose prometheus as metrics source and then create dashboard.

Docker-compose.local.yml for this project:



version: "3.8"

services:
  nginx:
    container_name: nginx_microservice
    ports:
      - 8080:8080
      - 443:443
    build:
      context: ./nginx
      dockerfile: Dockerfile
    networks:
      - nats

  nats-streaming:
    container_name: nats-streaming
    image: nats-streaming:latest
    ports:
      - "8222:8222"
      - "4222:4222"
      - "6222:6222"
    networks: [ "nats" ]
    restart: always
    command: [
        '-p',
        '4222',
        '-m',
        '8222',
        '-hbi',
        '5s',
        '-hbt',
        '5s',
        '-hbf',
        '2',
        '-SD',
        '-cid',
        'microservice',
    ]

  mails_postgesql:
    image: postgres:13-alpine
    container_name: mails_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=mails_db
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./mails_pgdata:/var/lib/postgresql/data
    networks: [ "nats" ]

  mailhog:
    container_name: mailhog
    image: mailhog/mailhog:latest
    ports:
      - "1025:1025"
      - "8025:8025"
    restart: always
    networks: [ "nats" ]


  redis:
    image: redis:6-alpine
    restart: always
    container_name: user_redis
    ports:
      - "6379:6379"
    networks: [ "nats" ]

  prometheus:
    container_name: prometheus_container
    restart: always
    image: prom/prometheus
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:Z
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--storage.tsdb.retention=20d'
      - '--web.console.libraries=/usr/share/prometheus/console_libraries'
      - '--web.console.templates=/usr/share/prometheus/consoles'
    ports:
      - '9090:9090'
    networks: [ "nats" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "nats" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks: [ "nats" ]


  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - 5775:5775/udp
      - 6831:6831/udp
      - 6832:6832/udp
      - 5778:5778
      - 16686:16686
      - 14268:14268
      - 14250:14250
      - 9411:9411
    networks: [ "nats" ]


networks:
  nats:
    name: nats


Enter fullscreen mode Exit fullscreen mode

At the start of application we load yaml config using viper, initialize all that we need and run application:



// Run start application
func (s *server) Run() error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    smtpClient := smtp.NewSmtpClient(s.cfg)
    publisher := nats.NewPublisher(s.natsConn)
    emailPgRepo := repository.NewEmailPGRepository(s.pgxPool)
    emailRedisRepo := repository.NewEmailRedisRepository(s.redis)
    emailUC := usecase.NewEmailUseCase(s.log, emailPgRepo, publisher, smtpClient, emailRedisRepo)

    im := interceptors.NewInterceptorManager(s.log, s.cfg)
    mw := middlewares.NewMiddlewareManager(s.log, s.cfg)

    validate := validator.New()

    go func() {
        emailSubscriber := nats.NewEmailSubscriber(s.natsConn, s.log, emailUC, validate)
        emailSubscriber.Run(ctx)
    }()

    go func() {
        s.log.Infof("Server is listening on PORT: %s", s.cfg.HTTP.Port)
        s.runHttpServer()
    }()

    metricsServer := echo.New()
    go func() {
        metricsServer.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
        s.log.Infof("Metrics server is running on port: %s", s.cfg.Metrics.Port)
        if err := metricsServer.Start(s.cfg.Metrics.Port); err != nil {
            s.log.Error(err)
            cancel()
        }
    }()
    v1 := s.echo.Group("/api/v1")
    v1.Use(mw.Metrics)

    emailHandlers := emailsV1.NewEmailHandlers(v1.Group("/email"), emailUC, s.log, validate)
    emailHandlers.MapRoutes()

    l, err := net.Listen("tcp", s.cfg.GRPC.Port)
    if err != nil {
        return errors.Wrap(err, "net.Listen")
    }
    defer l.Close()

    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        s.log.Fatalf("failed to load key pair: %s", err)
    }

    grpcServer := grpc.NewServer(
        grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
        grpc.KeepaliveParams(keepalive.ServerParameters{
            MaxConnectionIdle: s.cfg.GRPC.MaxConnectionIdle * time.Minute,
            Timeout:           s.cfg.GRPC.Timeout * time.Second,
            MaxConnectionAge:  s.cfg.GRPC.MaxConnectionAge * time.Minute,
            Time:              s.cfg.GRPC.Timeout * time.Minute,
        }),
        grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
            grpc_ctxtags.UnaryServerInterceptor(),
            grpc_opentracing.UnaryServerInterceptor(),
            grpc_prometheus.UnaryServerInterceptor,
            grpcrecovery.UnaryServerInterceptor(),
            im.Logger,
        ),
        ),
    )

    emailGRPCService := emailGrpc.NewEmailGRPCService(emailUC, s.log, validate)
    emailService.RegisterEmailServiceServer(grpcServer, emailGRPCService)
    grpc_prometheus.Register(grpcServer)

    s.log.Infof("GRPC Server is listening on port: %s", s.cfg.GRPC.Port)
    s.log.Fatal(grpcServer.Serve(l))

    if s.cfg.HTTP.Development {
        reflection.Register(grpcServer)
    }

    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    select {
    case v := <-quit:
        s.log.Errorf("signal.Notify: %v", v)
    case done := <-ctx.Done():
        s.log.Errorf("ctx.Done: %v", done)
    }

    if err := s.echo.Server.Shutdown(ctx); err != nil {
        return errors.Wrap(err, "echo.Server.Shutdown")
    }

    if err := metricsServer.Shutdown(ctx); err != nil {
        s.log.Errorf("metricsServer.Shutdown: %v", err)
    }

    grpcServer.GracefulStop()
    s.log.Info("Server Exited Properly")

    return nil
}


Enter fullscreen mode Exit fullscreen mode

For REST http i used echo, another good popular choice on my opinion is gin,
and swag for generate RESTful API documentation.
Create email handler accept requests, start tracing span, validate input using validator and call usecase method:



// Create Create
// @Tags Emails
// @Summary Create new email
// @Description Create new email and send it
// @Accept json
// @Produce json
// @Success 201 {object} models.Email
// @Router /email [post]
func (h *emailHandlers) Create() echo.HandlerFunc {
    return func(c echo.Context) error {
        span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "emailHandlers.Create")
        defer span.Finish()
        createRequests.Inc()

        var mail models.Email
        if err := c.Bind(&mail); err != nil {
            errorRequests.Inc()
            h.log.Errorf("c.Bind: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        if err := h.validate.StructCtx(ctx, &mail); err != nil {
            errorRequests.Inc()
            h.log.Errorf("validate.StructCtx: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        if err := h.emailUC.PublishCreate(ctx, &mail); err != nil {
            errorRequests.Inc()
            h.log.Errorf("emailUC.PublishCreate: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        successRequests.Inc()
        return c.NoContent(http.StatusCreated)
    }
}


Enter fullscreen mode Exit fullscreen mode

For gRPC service method idea is the same, but first we need generate proto files:



syntax = "proto3";

import "google/protobuf/timestamp.proto";

//protoc --go_out=plugins=grpc:. *.proto

package emailService;
option go_package = ".;emailService";

message Email {
  string EmailID = 1;
  string From = 2;
  string To = 3;
  string Subject = 4;
  string Message = 5;
  google.protobuf.Timestamp CreatedAt = 6;
}

message Empty {}

message CreateReq {
  string From = 1;
  string To = 2;
  string Subject = 3;
  string Message = 4;
}

message CreateRes {
  string status = 1;
}

message GetByIDReq {
  string EmailID = 1;
}

message GetByIDRes {
  Email Email = 1;
}

message SearchReq {
  string Search = 1;
  int64 page = 2;
  int64 size = 3;
}

message SearchRes {
  int64 TotalCount = 1;
  int64 TotalPages = 2;
  int64 Page = 3;
  int64 Size = 4;
  bool HasMore = 5;
  repeated Email Emails = 6;
}

service EmailService {
  rpc Create(CreateReq) returns (CreateRes) {}
  rpc GetByID(GetByIDReq) returns (GetByIDRes) {}
  rpc Search(SearchReq) returns (SearchRes) {}
}


Enter fullscreen mode Exit fullscreen mode

gRPC create email handler business logic is the same as we have in REST:



// Create create email
func (e *emailGRPCService) Create(ctx context.Context, req *emailService.CreateReq) (*emailService.CreateRes, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
    defer span.Finish()
    createRequests.Inc()

    m := &models.Email{
        From:    req.GetFrom(),
        To:      req.GetTo(),
        Subject: req.GetSubject(),
        Message: req.GetMessage(),
    }

    if err := e.validator.StructCtx(ctx, m); err != nil {
        errorRequests.Inc()
        e.log.Errorf("validator.StructCtx: %v", err)
        return nil, grpcErrors.ErrorResponse(err, err.Error())
    }

    if err := e.emailUC.Create(ctx, m); err != nil {
        errorRequests.Inc()
        e.log.Errorf("emailUC.Create: %v", err)
        return nil, grpcErrors.ErrorResponse(err, err.Error())
    }

    successRequests.Inc()
    return &emailService.CreateRes{Status: "Ok"}, nil
}


Enter fullscreen mode Exit fullscreen mode

Create email usecase makes the record in the database and publish send email event:



// Create create new email saves in db
func (e *emailUseCase) Create(ctx context.Context, email *models.Email) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "emailUseCase.Create")
    defer span.Finish()

    created, err := e.emailPGRepo.Create(ctx, email)
    if err != nil {
        return errors.Wrap(err, "emailPGRepo.Create")
    }

    mailBytes, err := json.Marshal(created)
    if err != nil {
        return errors.Wrap(err, "json.Marshal")
    }

    return e.publisher.Publish(sendEmailSubject, mailBytes)
}


Enter fullscreen mode Exit fullscreen mode

And the repository create method saves data to database.

For interacting with postgres used pgx:



// Create create new email
func (e *emailPGRepository) Create(ctx context.Context, email *models.Email) (*models.Email, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "emailPGRepository.Create")
    defer span.Finish()

    var mail models.Email
    if err := e.db.QueryRow(
        ctx,
        createEmailQuery,
        &email.From,
        &email.To,
        &email.Subject,
        &email.Message,
    ).Scan(&mail.EmailID, &mail.From, &mail.To, &mail.Subject, &mail.Message, &mail.CreatedAt); err != nil {
        return nil, errors.Wrap(err, "Scan")
    }

    return &mail, nil
}


Enter fullscreen mode Exit fullscreen mode

Jaeger has nice web interface for watch our tracing available on http://localhost:16686

MailHog is good solution for email testing,
and for go smtp client here used go-simple-mail.

NATS out of the box has the very simple web ui:

The Subscriber is most interesting part, first we subcube to subject using worker pool:



// Subscribe subscribe to subject and run workers with given callback for handling messages
func (s *emailSubscriber) Subscribe(subject, qgroup string, workersNum int, cb stan.MsgHandler) {
    s.log.Infof("Subscribing to Subject: %v, group: %v", subject, qgroup)
    wg := &sync.WaitGroup{}

    for i := 0; i <= workersNum; i++ {
        wg.Add(1)
        go s.runWorker(
            wg,
            i,
            s.stanConn,
            subject,
            qgroup,
            cb,
            stan.SetManualAckMode(),
            stan.AckWait(ackWait),
            stan.DurableName(durableName),
            stan.MaxInflight(maxInflight),
            stan.DeliverAllAvailable(),
        )
    }
    wg.Wait()
}


Enter fullscreen mode Exit fullscreen mode

Workers execute conn.QueueSubscribe method, we pass subject and queue group name,
callback for handling messages and nats options as parameters:



func (s *emailSubscriber) runWorker(
    wg *sync.WaitGroup,
    workerID int,
    conn stan.Conn,
    subject string,
    qgroup string,
    cb stan.MsgHandler,
    opts ...stan.SubscriptionOption,
) {
    s.log.Infof("Subscribing worker: %v, subject: %v, qgroup: %v", workerID, subject, qgroup)
    defer wg.Done()

    _, err := conn.QueueSubscribe(subject, qgroup, cb, opts...)
    if err != nil {
        s.log.Errorf("WorkerID: %v, QueueSubscribe: %v", workerID, err)
        if err := conn.Close(); err != nil {
            s.log.Errorf("WorkerID: %v, conn.Close error: %v", workerID, err)
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

processCreateEmail handling create email events, it's start tracing span, increase metrics counters,
then unmarshal message data, and call usecase create method, if it fails, we retry for 3 times using retry-go,
if it still fails, we check is the current message redelivered and if redelivery count > maxRedeliveryCount(it's up to your business logic, here is 3 times limit), handling error cases can be very different and depends on your service business logic, in this example used Dead Letter Queue approach.



func (s *emailSubscriber) processCreateEmail(ctx context.Context) stan.MsgHandler {
    return func(msg *stan.Msg) {
        span, ctx := opentracing.StartSpanFromContext(ctx, "emailSubscriber.processCreateEmail")
        defer span.Finish()

        s.log.Infof("subscriber process Create Email: %s", msg.String())
        totalSubscribeMessages.Inc()

        var m models.Email
        if err := json.Unmarshal(msg.Data, &m); err != nil {
            errorSubscribeMessages.Inc()
            s.log.Errorf("json.Unmarshal : %v", err)
            return
        }

        if err := retry.Do(func() error {
            return s.emailUC.Create(ctx, &m)
        },
            retry.Attempts(retryAttempts),
            retry.Delay(retryDelay),
            retry.Context(ctx),
        ); err != nil {
            errorSubscribeMessages.Inc()
            s.log.Errorf("emailUC.Create : %v", err)

            if msg.Redelivered && msg.RedeliveryCount > maxRedeliveryCount {
                if err := s.publishErrorMessage(ctx, msg, err); err != nil {
                    s.log.Errorf("publishErrorMessage : %v", err)
                    return
                }
                if err := msg.Ack(); err != nil {
                    s.log.Errorf("msg.Ack: %v", err)
                    return
                }
            }
            return
        }

        if err := msg.Ack(); err != nil {
            s.log.Errorf("msg.Ack: %v", err)
        }
        successSubscribeMessages.Inc()
    }
}


Enter fullscreen mode Exit fullscreen mode

For quering data our microservice has GetByID and Search handlers.

Full source code and list of all used tools you can find here πŸ‘¨β€πŸ’» :)

I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)

Top comments (3)

Collapse
 
zinderic profile image
Kalin Staykov

Very nice repo and article! I was looking for an example with gRPC, NATS and Echo and I got just the right level of detail here. For the DB migrations is this the one you use? :

github.com/golang-migrate/migrate

Maybe it's worth mentioning because the Makefile assumes that a "migrate" command is available but I don't see any notes about it.

Collapse
 
aleksk1ng profile image
Alexander

Hi, big thanks, very nice to see that my example is helpful. Yes, here used migrate, already have added it to libraries list. πŸ™‚

Collapse
 
tudorhulban profile image
Tudor Hulban • Edited

hi,
would refactor runWorker at 6+ params passed.