Hi, in this article I've tried to make microservice using: π
NATS as message broker
gRPC Go implementation of gRPC
PostgreSQL as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
MailHog Web and API based SMTP testing
Redis Type-safe Redis client for Golang
Nginx HTTP and reverse proxy server
swag Swagger for Go
migrate for migrations
Source code you can find in GitHub repository
In this article want to make a main accent on NATS Streaming which is written in the Go π€©.
As we can see in documentation it's have nice features:
- Message/event persistence NATS Streaming offers configurable message persistence: in-memory, flat files or database.
- At-least-once-delivery message acknowledgements between publisher and server (for publish operations) and between subscriber and server (to confirm message delivery). Messages are persisted by the server in memory or secondary storage (or other external storage) and will be redelivered to eligible subscribing clients as needed.
- Publisher rate limiting MaxPubAcksInFlight option is effectively limits the number of unacknowledged messages that a publisher may have in-flight at any given time. When this maximum is reached, further async publish calls will block until the number of unacknowledged messages falls below the specified limit.
- Rate matching/limiting per subscriber MaxInFlight option is edesignates the maximum number of outstanding acknowledgements (messages that have been delivered but not acknowledged) that NATS Streaming will allow for a given subscription. When this limit is reached, NATS Streaming will suspend delivery of messages to this subscription until the number of unacknowledged messages falls below the specified limit.
- Durable subscriptions specify a "durable name" which will survive client restarts.
-
Historical message replay by subject New subscriptions may specify a start position in the stream of messages stored for the subscribed subject's channel.By using this option, message delivery may begin at:
- The earliest message stored for this subject
- The most recently stored message for this subject, prior to the start of the current subscription. This is commonly thought of as "last value" or "initial value" caching.
- A specific date/time in nanoseconds
- An historical offset from the current server date/time, e.g. the last 30 seconds.
- A specific message sequence number
For this example i didn't implement any interesting business logic and didn't cover tests,
the microservice send emails and save it's to PostgreSQL, can communicate by NATS, gRPC and REST.
NATS subscribers handle create email events, REST and gRPC used for quering data.
For local development:
make cert // generates tls certificates
make migrate_up // run sql migrations
make swagger // generate swagger documentation
make local or develop // for run docker compose files
For run all in the docker you can run make develop it has hot reloading feature.
UI interfaces will be available on ports:
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3000
NATS UI: http://localhost:8222/
MailHog: http://localhost:8025/
Swagger UI by default will run on: https://localhost:5000/swagger/index.html
In Grafana you need to chose prometheus as metrics source and then create dashboard.
Docker-compose.local.yml for this project:
version: "3.8"
services:
nginx:
container_name: nginx_microservice
ports:
- 8080:8080
- 443:443
build:
context: ./nginx
dockerfile: Dockerfile
networks:
- nats
nats-streaming:
container_name: nats-streaming
image: nats-streaming:latest
ports:
- "8222:8222"
- "4222:4222"
- "6222:6222"
networks: [ "nats" ]
restart: always
command: [
'-p',
'4222',
'-m',
'8222',
'-hbi',
'5s',
'-hbt',
'5s',
'-hbf',
'2',
'-SD',
'-cid',
'microservice',
]
mails_postgesql:
image: postgres:13-alpine
container_name: mails_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=mails_db
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./mails_pgdata:/var/lib/postgresql/data
networks: [ "nats" ]
mailhog:
container_name: mailhog
image: mailhog/mailhog:latest
ports:
- "1025:1025"
- "8025:8025"
restart: always
networks: [ "nats" ]
redis:
image: redis:6-alpine
restart: always
container_name: user_redis
ports:
- "6379:6379"
networks: [ "nats" ]
prometheus:
container_name: prometheus_container
restart: always
image: prom/prometheus
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:Z
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention=20d'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- '9090:9090'
networks: [ "nats" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "nats" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks: [ "nats" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- 5775:5775/udp
- 6831:6831/udp
- 6832:6832/udp
- 5778:5778
- 16686:16686
- 14268:14268
- 14250:14250
- 9411:9411
networks: [ "nats" ]
networks:
nats:
name: nats
At the start of application we load yaml config using viper, initialize all that we need and run application:
// Run start application
func (s *server) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
smtpClient := smtp.NewSmtpClient(s.cfg)
publisher := nats.NewPublisher(s.natsConn)
emailPgRepo := repository.NewEmailPGRepository(s.pgxPool)
emailRedisRepo := repository.NewEmailRedisRepository(s.redis)
emailUC := usecase.NewEmailUseCase(s.log, emailPgRepo, publisher, smtpClient, emailRedisRepo)
im := interceptors.NewInterceptorManager(s.log, s.cfg)
mw := middlewares.NewMiddlewareManager(s.log, s.cfg)
validate := validator.New()
go func() {
emailSubscriber := nats.NewEmailSubscriber(s.natsConn, s.log, emailUC, validate)
emailSubscriber.Run(ctx)
}()
go func() {
s.log.Infof("Server is listening on PORT: %s", s.cfg.HTTP.Port)
s.runHttpServer()
}()
metricsServer := echo.New()
go func() {
metricsServer.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
s.log.Infof("Metrics server is running on port: %s", s.cfg.Metrics.Port)
if err := metricsServer.Start(s.cfg.Metrics.Port); err != nil {
s.log.Error(err)
cancel()
}
}()
v1 := s.echo.Group("/api/v1")
v1.Use(mw.Metrics)
emailHandlers := emailsV1.NewEmailHandlers(v1.Group("/email"), emailUC, s.log, validate)
emailHandlers.MapRoutes()
l, err := net.Listen("tcp", s.cfg.GRPC.Port)
if err != nil {
return errors.Wrap(err, "net.Listen")
}
defer l.Close()
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
s.log.Fatalf("failed to load key pair: %s", err)
}
grpcServer := grpc.NewServer(
grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: s.cfg.GRPC.MaxConnectionIdle * time.Minute,
Timeout: s.cfg.GRPC.Timeout * time.Second,
MaxConnectionAge: s.cfg.GRPC.MaxConnectionAge * time.Minute,
Time: s.cfg.GRPC.Timeout * time.Minute,
}),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_opentracing.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpcrecovery.UnaryServerInterceptor(),
im.Logger,
),
),
)
emailGRPCService := emailGrpc.NewEmailGRPCService(emailUC, s.log, validate)
emailService.RegisterEmailServiceServer(grpcServer, emailGRPCService)
grpc_prometheus.Register(grpcServer)
s.log.Infof("GRPC Server is listening on port: %s", s.cfg.GRPC.Port)
s.log.Fatal(grpcServer.Serve(l))
if s.cfg.HTTP.Development {
reflection.Register(grpcServer)
}
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
select {
case v := <-quit:
s.log.Errorf("signal.Notify: %v", v)
case done := <-ctx.Done():
s.log.Errorf("ctx.Done: %v", done)
}
if err := s.echo.Server.Shutdown(ctx); err != nil {
return errors.Wrap(err, "echo.Server.Shutdown")
}
if err := metricsServer.Shutdown(ctx); err != nil {
s.log.Errorf("metricsServer.Shutdown: %v", err)
}
grpcServer.GracefulStop()
s.log.Info("Server Exited Properly")
return nil
}
For REST http i used echo, another good popular choice on my opinion is gin,
and swag for generate RESTful API documentation.
Create email handler accept requests, start tracing span, validate input using validator and call usecase method:
// Create Create
// @Tags Emails
// @Summary Create new email
// @Description Create new email and send it
// @Accept json
// @Produce json
// @Success 201 {object} models.Email
// @Router /email [post]
func (h *emailHandlers) Create() echo.HandlerFunc {
return func(c echo.Context) error {
span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "emailHandlers.Create")
defer span.Finish()
createRequests.Inc()
var mail models.Email
if err := c.Bind(&mail); err != nil {
errorRequests.Inc()
h.log.Errorf("c.Bind: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := h.validate.StructCtx(ctx, &mail); err != nil {
errorRequests.Inc()
h.log.Errorf("validate.StructCtx: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := h.emailUC.PublishCreate(ctx, &mail); err != nil {
errorRequests.Inc()
h.log.Errorf("emailUC.PublishCreate: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
successRequests.Inc()
return c.NoContent(http.StatusCreated)
}
}
For gRPC service method idea is the same, but first we need generate proto files:
syntax = "proto3";
import "google/protobuf/timestamp.proto";
//protoc --go_out=plugins=grpc:. *.proto
package emailService;
option go_package = ".;emailService";
message Email {
string EmailID = 1;
string From = 2;
string To = 3;
string Subject = 4;
string Message = 5;
google.protobuf.Timestamp CreatedAt = 6;
}
message Empty {}
message CreateReq {
string From = 1;
string To = 2;
string Subject = 3;
string Message = 4;
}
message CreateRes {
string status = 1;
}
message GetByIDReq {
string EmailID = 1;
}
message GetByIDRes {
Email Email = 1;
}
message SearchReq {
string Search = 1;
int64 page = 2;
int64 size = 3;
}
message SearchRes {
int64 TotalCount = 1;
int64 TotalPages = 2;
int64 Page = 3;
int64 Size = 4;
bool HasMore = 5;
repeated Email Emails = 6;
}
service EmailService {
rpc Create(CreateReq) returns (CreateRes) {}
rpc GetByID(GetByIDReq) returns (GetByIDRes) {}
rpc Search(SearchReq) returns (SearchRes) {}
}
gRPC create email handler business logic is the same as we have in REST:
// Create create email
func (e *emailGRPCService) Create(ctx context.Context, req *emailService.CreateReq) (*emailService.CreateRes, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
defer span.Finish()
createRequests.Inc()
m := &models.Email{
From: req.GetFrom(),
To: req.GetTo(),
Subject: req.GetSubject(),
Message: req.GetMessage(),
}
if err := e.validator.StructCtx(ctx, m); err != nil {
errorRequests.Inc()
e.log.Errorf("validator.StructCtx: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
if err := e.emailUC.Create(ctx, m); err != nil {
errorRequests.Inc()
e.log.Errorf("emailUC.Create: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
successRequests.Inc()
return &emailService.CreateRes{Status: "Ok"}, nil
}
Create email usecase makes the record in the database and publish send email event:
// Create create new email saves in db
func (e *emailUseCase) Create(ctx context.Context, email *models.Email) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "emailUseCase.Create")
defer span.Finish()
created, err := e.emailPGRepo.Create(ctx, email)
if err != nil {
return errors.Wrap(err, "emailPGRepo.Create")
}
mailBytes, err := json.Marshal(created)
if err != nil {
return errors.Wrap(err, "json.Marshal")
}
return e.publisher.Publish(sendEmailSubject, mailBytes)
}
And the repository create method saves data to database.
For interacting with postgres used pgx:
// Create create new email
func (e *emailPGRepository) Create(ctx context.Context, email *models.Email) (*models.Email, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "emailPGRepository.Create")
defer span.Finish()
var mail models.Email
if err := e.db.QueryRow(
ctx,
createEmailQuery,
&email.From,
&email.To,
&email.Subject,
&email.Message,
).Scan(&mail.EmailID, &mail.From, &mail.To, &mail.Subject, &mail.Message, &mail.CreatedAt); err != nil {
return nil, errors.Wrap(err, "Scan")
}
return &mail, nil
}
Jaeger has nice web interface for watch our tracing available on http://localhost:16686
MailHog is good solution for email testing,
and for go smtp client here used go-simple-mail.
NATS out of the box has the very simple web ui:
The Subscriber is most interesting part, first we subcube to subject using worker pool:
// Subscribe subscribe to subject and run workers with given callback for handling messages
func (s *emailSubscriber) Subscribe(subject, qgroup string, workersNum int, cb stan.MsgHandler) {
s.log.Infof("Subscribing to Subject: %v, group: %v", subject, qgroup)
wg := &sync.WaitGroup{}
for i := 0; i <= workersNum; i++ {
wg.Add(1)
go s.runWorker(
wg,
i,
s.stanConn,
subject,
qgroup,
cb,
stan.SetManualAckMode(),
stan.AckWait(ackWait),
stan.DurableName(durableName),
stan.MaxInflight(maxInflight),
stan.DeliverAllAvailable(),
)
}
wg.Wait()
}
Workers execute conn.QueueSubscribe method, we pass subject and queue group name,
callback for handling messages and nats options as parameters:
func (s *emailSubscriber) runWorker(
wg *sync.WaitGroup,
workerID int,
conn stan.Conn,
subject string,
qgroup string,
cb stan.MsgHandler,
opts ...stan.SubscriptionOption,
) {
s.log.Infof("Subscribing worker: %v, subject: %v, qgroup: %v", workerID, subject, qgroup)
defer wg.Done()
_, err := conn.QueueSubscribe(subject, qgroup, cb, opts...)
if err != nil {
s.log.Errorf("WorkerID: %v, QueueSubscribe: %v", workerID, err)
if err := conn.Close(); err != nil {
s.log.Errorf("WorkerID: %v, conn.Close error: %v", workerID, err)
}
}
}
processCreateEmail handling create email events, it's start tracing span, increase metrics counters,
then unmarshal message data, and call usecase create method, if it fails, we retry for 3 times using retry-go,
if it still fails, we check is the current message redelivered and if redelivery count > maxRedeliveryCount(it's up to your business logic, here is 3 times limit), handling error cases can be very different and depends on your service business logic, in this example used Dead Letter Queue approach.
func (s *emailSubscriber) processCreateEmail(ctx context.Context) stan.MsgHandler {
return func(msg *stan.Msg) {
span, ctx := opentracing.StartSpanFromContext(ctx, "emailSubscriber.processCreateEmail")
defer span.Finish()
s.log.Infof("subscriber process Create Email: %s", msg.String())
totalSubscribeMessages.Inc()
var m models.Email
if err := json.Unmarshal(msg.Data, &m); err != nil {
errorSubscribeMessages.Inc()
s.log.Errorf("json.Unmarshal : %v", err)
return
}
if err := retry.Do(func() error {
return s.emailUC.Create(ctx, &m)
},
retry.Attempts(retryAttempts),
retry.Delay(retryDelay),
retry.Context(ctx),
); err != nil {
errorSubscribeMessages.Inc()
s.log.Errorf("emailUC.Create : %v", err)
if msg.Redelivered && msg.RedeliveryCount > maxRedeliveryCount {
if err := s.publishErrorMessage(ctx, msg, err); err != nil {
s.log.Errorf("publishErrorMessage : %v", err)
return
}
if err := msg.Ack(); err != nil {
s.log.Errorf("msg.Ack: %v", err)
return
}
}
return
}
if err := msg.Ack(); err != nil {
s.log.Errorf("msg.Ack: %v", err)
}
successSubscribeMessages.Inc()
}
}
For quering data our microservice has GetByID and Search handlers.
Full source code and list of all used tools you can find here π¨βπ» :)
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)
Top comments (3)
Very nice repo and article! I was looking for an example with gRPC, NATS and Echo and I got just the right level of detail here. For the DB migrations is this the one you use? :
github.com/golang-migrate/migrate
Maybe it's worth mentioning because the Makefile assumes that a "migrate" command is available but I don't see any notes about it.
Hi, big thanks, very nice to see that my example is helpful. Yes, here used migrate, already have added it to libraries list. π
hi,
would refactor runWorker at 6+ params passed.