DEV Community

Cover image for Learning Microservices with Go(Part 4). GRPC (Synchronous Communication)
Manav Kushwaha
Manav Kushwaha

Posted on

Learning Microservices with Go(Part 4). GRPC (Synchronous Communication)

Intro

This post is the fourth part of the "Learning Microservices with Go". I'm writing these posts as I'm learning the concepts. If you haven't checked out the part 3, here's the link for it: Part 3.
Link for Git Repo: Github

In part 3, we explored serialization, various serialization formats, their comparison, implemented some simple benchmarks to measure the performance.

In this part, we'll be adding gRPC code to our project which uses Protocol Buffers, that we'd learnt about in the last part. We'll use gRPC logic for communication with different services instead of REST APIs which we're using earlier.

We'll also be testing the services using Postman. This was the first time I used postman to test a non-http service. Everyday you learn something new!!

Let's start!

Synchronous Communication

Synchronous communication is way for applications(eg: microservices) to interact over the network using the request-response model. There is a request that goes to the other application and it waits for the response from the other side. It waits till the response comes or the request times out. This way of interaction is used by many protocols eg: HTTP.

Request-Response Model

Also, there are some existing Remote Procedure Call(RPC) framework and libraries that can help you achieve this task. What's an RPC? RPC allows one machine on the network to call a function(subroutine) on another machine as if it was called natively on the other machine.

For our use case, we'll be using RPCs to call methods of a service from another service. We'll be taking help of gRPC for this task.

gRPC

gRPC is an RPC framework that was developed by Google. It used HTTP/2 as the transport protocol and Protobufs as the serialization format.

  • It essentially allows the user to define client side and server side code for the services. The code generated can be used by other services to call the specified method on this service.
  • Also it's language agnostic i.e. you can use different language for each service that best suits that service.

gRPC is one of the most widely known RPC framework used for building microservice architecture.

Defining Service using Protobufs

Let's now see how to define a service using Protobufs. We'll also be defining code for the gRPC framework, which will allow us to talk with other services.

First install the gRPC plugin for Protobuf. You can do this by:

go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest 
Enter fullscreen mode Exit fullscreen mode

Now we'll add some code to the api/movie.proto file that we created in the last part.

service MetadataService {
    rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);

    rpc PutMetadata(PutMetadataRequest) returns (PutMetadataResponse);
}
message GetMetadataRequest {
    string movie_id = 1;
}
message GetMetadataResponse {
    Metadata metadata = 1;
}

message PutMetadataRequest {
    Metadata metadata = 1;
}
message PutMetadataResponse {
}
Enter fullscreen mode Exit fullscreen mode

The code that we added defined our MetadataService. We already have the Metadata structure which we defined in the previous part that we can reuse.

Let's explain the code that we added.
We defined a rpc named GetMetadata that accepts GetMetadataRequest as it's parameter and outputs GetMetadataResponse. We then defined these two messages below the request function. Note:

  • It's good practice to create a new structure for both a request and a response. Eg: GetMetadataRequest, GetMetadataResponse.
  • It's good to follow consistent naming for all the endpoints. We'll prefix the request and response with the function name for all our endpoints.

Similarly for the PutMetadata.

Now defining the RatingService.

service RatingService {
    rpc GetAggregatedRating(GetAggregatedRatingRequest) returns (GetAggregatedRatingResponse);
    rpc PutRating(PutRatingRequest) returns (PutRatingResponse);
}

message GetAggregatedRatingRequest {
    string record_id = 1;
    string record_type = 2;
}

message GetAggregatedRatingResponse {
    double rating_value = 1;
}


message PutRatingRequest {
    string user_id = 1;
    string record_id = 2;
    int32 record_type = 3;
    int32 record_value = 4;
}
message PutRatingResponse {
}
Enter fullscreen mode Exit fullscreen mode

Similarly add the code for the MovieService

service MovieService {
    rpc GetMovieDetails(GetMovieDetailsRequest) returns (GetMovieDetailsResponse);
}

message GetMovieDetailsRequest {
    string movie_id = 1;
}

message GetMovieDetailsResponse {
    MovieDetails movie_details = 1;
}
Enter fullscreen mode Exit fullscreen mode

Now our movie.proto file includes the structure definitions and the API definitions for the
services.

We'll run the following command in the root directory to generate the code for these definitions.

protoc -I=api --go_out=. --go-grpc_out=. movie.proto
Enter fullscreen mode Exit fullscreen mode

-I=api defines the input file path
--go_out=. defines the output path of the protobuf structure code
--go-grpc_out defines the output path of the service code.
movie.proto specifies the protobuf file containing definitions.

After execution, we'll find a movie_grpc.pb.go file in the /gen directory containing the code for the services.

Here's a small screenshot of what it'll look like.
Generated Code

We'll use this generated code in our Go services.

Generated Model vs Internal model

So currently we're having two versions of our model data structures. 1. The internal ones in metadata/pkg/model 2. The newly generated ones in gen directory.
Some of you might think that having two similar structures is redundant. (I also thought the same earlier). But, we need to understand that these structures differ in the their purpose.

  • Internal Model: The structures that we create to be used across our code base like in repository, controller etc.
  • Generated Model: The structures generated by tools like protoc compiler. These should only be used for serialization ie for transferring the data or storing the serialized data.

We should not use the generated structures across the application for the following reasons:

  1. Coupling between application and serialization format. Switching to other serialization would requre full codebase changes.
  2. Generated code (function definitions etc) may vary from version to version. This might cause your code which is using the generated code to break.
  3. Generated Code is harder to use. Eg: In case of protobufs the field values are optional. Hence it'll require having multiple field != nil checks across the project.

Hence, we'll need some mapping logic to convert the data from one model to other. We'll add a file called mapper.go in the directory metadata/pkg/model

package model

import "movieexample.com/gen"

// MetadataToProto converts a Metadata struct into a generated proto counterpart
func MetadataToProto(m *Metadata) *gen.Metadata {
    return &gen.Metadata{
        Id:          m.ID,
        Title:       m.Title,
        Description: m.Description,
        Director:    m.Director,
    }
}

// MetadataFromProto converts a generated proto counterpart into a Metadata struct
func MetadataFromProto(m *gen.Metadata) *Metadata {
    return &Metadata{
        ID:          m.Id,
        Title:       m.Title,
        Description: m.Description,
        Director:    m.Director,
    }
}

Enter fullscreen mode Exit fullscreen mode

Implementing clients and gateways

We'll now plugin the generated code into our microservices. This will help us to switch our communication protocol from the JSON based HTTP to Protobufs based gRPC calls.

In metadata/internal/handler add grpc directory and add grpc.go file.

package grpc

import (
    "context"
    "errors"
    "log"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "movieexample.com/gen"
    "movieexample.com/metadata/internal/controller/metadata"
    "movieexample.com/metadata/pkg/model"
)

// Handler defines a movie metadata gRPC handler
type Handler struct {
    gen.UnimplementedMetadataServiceServer
    svc *metadata.Controller
}

// New creates a new movie metadata gRPC handler
func New(ctrl *metadata.Controller) *Handler {
    return &Handler{svc: ctrl}
}

// GetMetadata returns movie metadata by id
func (h *Handler) GetMetadata(ctx context.Context, req *gen.GetMetadataRequest) (*gen.GetMetadataResponse, error) {
    if req == nil || req.MovieId == "" {
        return nil, status.Errorf(codes.InvalidArgument, "nil req or empty id")
    }

    m, err := h.svc.Get(ctx, req.MovieId)
    if err != nil && errors.Is(err, metadata.ErrNotFound) {
        log.Printf("GetMetadata failed: Err: %v", err)
        return nil, status.Errorf(codes.NotFound, err.Error())
    } else if err != nil {
        return nil, status.Errorf(codes.Internal, err.Error())
    }

    return &gen.GetMetadataResponse{Metadata: model.MetadataToProto(m)}, nil
}

Enter fullscreen mode Exit fullscreen mode

Let's see the key points regarding the implementation:

  1. Our Handler implemented the function in the same format as the one defined in the generated MetadataServiceServer interface.
  2. We use the MetadataToProto mapping function to transform our internal structures into generated ones.
  3. We embed the UnimplementedMetadataServiceServer into our Handler. This is required by the Protobufs compiler.

Now we'll update the metadata/cmd/main.go file. Most of the code is from the previous parts

import (
    grpcHandler "movieexample.com/metadata/internal/handler/grpc"
)

func main() {
    // The service will be running on port "port"
    var port int
    flag.IntVar(&port, "port", 8081, "API Handler port")
    flag.Parse()
    log.Printf("Starting the movie metadata service on port %d", port)

    registery, err := consul.NewRegistery("localhost:8500")
    if err != nil {
        panic(err)
    }
    ctx := context.Background()
    instanceID := discovery.GenerateInstanceID(serviceName)

    if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
        panic(err)
    }

    go func() {
        for {
            if err := registery.HealthCheck(instanceID, serviceName); err != nil {
                log.Println("Failed to report healthy state: ", err.Error())
            }
            time.Sleep(1 * time.Second)
        }
    }()
    defer registery.Deregister(ctx, instanceID, serviceName)

    repo := memory.New()

        // Updated code

    svc := metadata.New(repo)
    h := grpcHandler.New(svc)
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", port))
    if err != nil {
        log.Fatal("failed to listen: %v", err)
    }
    srv := grpc.NewServer()
    gen.RegisterMetadataServiceServer(srv, h)
    if err := srv.Serve(lis); err != nil {
        panic(err)
    }
}

Enter fullscreen mode Exit fullscreen mode

We instantiated our gRPC server and started listening for requests on it. Rest of the code is same as before.

Let's do the same with the rating service.
Create a file grpc.go in /rating/internal/handler/grpc

package grpc

import (
    "context"
    "errors"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "movieexample.com/gen"
    "movieexample.com/rating/internal/controller/rating"
    "movieexample.com/rating/pkg/model"
)

// Handler defines a gRPC rating API handler
type Handler struct {
    gen.UnimplementedRatingServiceServer
    svc *rating.Controller
}

// New creates a new gRPC rating API handler
func New(ctrl *rating.Controller) *Handler {
    return &Handler{svc: ctrl}
}

// GetAggregateRating returns the aggregated rating for a record.
func (h *Handler) GetAggregateRating(ctx context.Context, req *gen.GetAggregatedRatingRequest) (*gen.GetAggregatedRatingResponse, error) {
    if req.RecordId == "" || req.RecordType == "" {
        return nil, status.Errorf(codes.InvalidArgument, "empty record id or record type")
    }
    val, err := h.svc.GetAggregatedRatings(ctx, model.RecordID(req.RecordId), model.RecordType(req.RecordType))
    if err != nil && errors.Is(err, rating.ErrNotFound) {
        return nil, status.Errorf(codes.NotFound, err.Error())
    } else if err != nil {
        return nil, status.Errorf(codes.Internal, err.Error())
    }

    return &gen.GetAggregatedRatingResponse{RatingValue: val}, nil
}

// PutRating writes a rating for a given record.
func (h *Handler) PutRating(ctx context.Context, req *gen.PutRatingRequest) (*gen.PutRatingResponse, error) {
    if req == nil || req.RecordId == "" || req.UserId == "" {
        return nil, status.Errorf(codes.InvalidArgument, "nil req or empty user id or record id")
    }
    rating := &model.Rating{
        RecordID:   model.RecordID(req.RecordId),
        RecordType: model.RecordType(req.RecordType),
        UserID:     model.UserID(req.UserId),
        Value:      model.RatingValue(req.RecordValue),
    }
    if err := h.svc.PutRating(ctx, model.RecordID(req.RecordId), model.RecordType(req.RecordType), rating); err != nil {
        return nil, err
    }
    return &gen.PutRatingResponse{}, nil
}
Enter fullscreen mode Exit fullscreen mode

Now similar to the metadata service, we'll update the main.go for rating i.e. rating/cmd/main.go

import (
    grpcHandler "movieexample.com/rating/internal/handler/grpc"
)

func main() {
    var port int
    flag.IntVar(&port, "port", 8082, "API Handler port")
    flag.Parse()
    fmt.Printf("Starting the movie rating service on port %d", port)

    registery, err := consul.NewRegistery("localhost:8500")
    if err != nil {
        panic(err)
    }

    ctx := context.Background()
    instanceID := discovery.GenerateInstanceID(serviceName)

    if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
        panic(err)
    }

    go func() {
        for {
            if err := registery.HealthCheck(instanceID, serviceName); err != nil {
                log.Println("Failed to report healthy state: ", err.Error())
            }
            time.Sleep(1 * time.Second)
        }
    }()
    defer registery.Deregister(ctx, instanceID, serviceName)

    repo := memory.New()
    ctrl := rating.New(repo)

    // New Code

    h := grpcHandler.New(ctrl)

    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    srv := grpc.NewServer()
    gen.RegisterRatingServiceServer(srv, h)
    srv.Serve(lis)
}
Enter fullscreen mode Exit fullscreen mode

Note: There are other imports that I've not put in the code as it'll make this part even longer.

Now add a handler file for movie service. Add movie/internal/handler/grpc/grpc.go

package grpc

import (
    "context"
    "log"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "movieexample.com/gen"
    "movieexample.com/metadata/pkg/model"
    "movieexample.com/movie/internal/controller/movie"
)

type Handler struct {
    gen.UnimplementedMovieServiceServer
    ctrl *movie.Controller
}

func New(ctrl *movie.Controller) *Handler {
    return &Handler{ctrl: ctrl}
}

func (h *Handler) GetMovieDetails(ctx context.Context, req *gen.GetMovieDetailsRequest) (*gen.GetMovieDetailsResponse, error) {
    log.Printf("GetMovieDetails called: %v", req.MovieId)

    if req == nil || req.MovieId == "" {
        return nil, status.Errorf(codes.InvalidArgument, "nil req or empty id")
    }
    m, err := h.ctrl.Get(ctx, req.MovieId)

    // Handle the errors returned by the grpc response
    if e, ok := status.FromError(err); ok {
        switch e.Code() {
        case codes.NotFound:
            return nil, status.Errorf(codes.NotFound, err.Error())
        default:
            return nil, status.Errorf(codes.Internal, err.Error())
        }
    }

    return &gen.GetMovieDetailsResponse{
        MovieDetails: &gen.MovieDetails{
            Rating:   *m.Rating,
            Metadata: model.MetadataToProto(&m.Metadata),
        },
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

In the previous few steps, we've added logic on the server side to handle the client requests. We'll now add code that'll connect to that logic from the client side.

First, we'll add a file internal/grpcutil/grpcutil.go. This will contain the logic for connecting to any service. We're extracting that logic to a function in this file.

package grpcutil

import (
    "context"
    "math/rand"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "movieexample.com/pkg/discovery"
)

// ServiceConnection attemps to select a random service instance and returns a gRPC connection to it.
func ServiceConnection(ctx context.Context, serviceName string, registery discovery.Registery) (*grpc.ClientConn, error) {
    addrs, err := registery.Discover(ctx, serviceName)
    if err != nil {
        return nil, err
    }

    return grpc.Dial(addrs[rand.Intn(len(addrs))], grpc.WithTransportCredentials(insecure.NewCredentials()))
}
Enter fullscreen mode Exit fullscreen mode

This function will pick a random instance of the service and connect to it.

Now we'll create gateway for our services.
Create a file as movie/internal/gateway/metdata/grpc/metadata.go
Add the following code

package grpc

import (
    "context"

    "movieexample.com/gen"
    "movieexample.com/internal/grpcutil"
    "movieexample.com/metadata/pkg/model"
    "movieexample.com/pkg/discovery"
)

// Gateway defines a gRPC gateway for movie metadata service
type Gateway struct {
    registry discovery.Registery
}

// New creates a new gRPC gateway for movie metadata service
func New(registry discovery.Registery) *Gateway {
    return &Gateway{registry: registry}
}

// Get retrieves movie metadata by movie id
func (g *Gateway) Get(ctx context.Context, id string) (*model.Metadata, error) {
    // Create a gRPC connection to the movie metadata service
    conn, err := grpcutil.ServiceConnection(ctx, "metadata", g.registry)
    if err != nil {
        return nil, err
    }
    defer conn.Close()

    client := gen.NewMetadataServiceClient(conn)
    resp, err := client.GetMetadata(ctx, &gen.GetMetadataRequest{MovieId: id})
    if err != nil {
        return nil, err
    }
    return model.MetadataFromProto(resp.Metadata), nil
}
Enter fullscreen mode Exit fullscreen mode

Let's mention some key points:

  1. We'll be using the grpcutil.ServiceConnection to connect to any service. This is the method that we'd created recently.
  2. We converted the metadata from Protobufs to model type using MetadataFromProto function.

Now we'll create a gateway for the rating service. Create a file /movie/internal/gateway/rating/grpc/rating.go

package grpc

import (
    "context"

    "movieexample.com/gen"
    "movieexample.com/internal/grpcutil"
    "movieexample.com/pkg/discovery"
    "movieexample.com/rating/pkg/model"
)

// Gateway defines a gRPC gateway for rating service
type Gateway struct {
    registry discovery.Registery
}

// New creates a new gRPC gateway for rating service
func New(registry discovery.Registery) *Gateway {
    return &Gateway{registry}
}

// GetAggregatedRating returns the aggregated rating for a record or ErrNotFound if there are no ratings for it.
func (g *Gateway) GetAggregatedRating(ctx context.Context, recordID model.RecordID, recordType model.RecordType) (float64, error) {
    conn, err := grpcutil.ServiceConnection(ctx, "rating", g.registry)
    if err != nil {
        return 0, err
    }
    defer conn.Close()

    client := gen.NewRatingServiceClient(conn)
    resp, err := client.GetAggregatedRating(ctx, &gen.GetAggregatedRatingRequest{RecordId: string(recordID), RecordType: string(recordType)})
    if err != nil {
        return 0, err
    }
    return resp.RatingValue, nil
}

// PutRating adds a rating for a record
func (g *Gateway) PutRating(ctx context.Context, recordID model.RecordID, recordType model.RecordType, userID model.UserID, value model.RatingValue) error {
    conn, err := grpcutil.ServiceConnection(ctx, "rating", g.registry)
    if err != nil {
        return err
    }
    defer conn.Close()

    client := gen.NewRatingServiceClient(conn)
    putRatingRequest := &gen.PutRatingRequest{
        UserId:      string(userID),
        RecordId:    string(recordID),
        RecordType:  string(recordType),
        RecordValue: int32(value),
    }

    _, err = client.PutRating(ctx, putRatingRequest)
    if err != nil {
        return err
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Now let's finally change the main function of the movie service.

func main() {
    var port int
    flag.IntVar(&port, "port", 8083, "Port to listen on")
    flag.Parse()

    // Register with consul
    registery, err := consul.NewRegistery("localhost:8500")
    if err != nil {
        panic(err)
    }

    ctx := context.Background()
    instanceID := discovery.GenerateInstanceID(serviceName)

    if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
        panic(err)
    }

    go func() {
        for {
            if err := registery.HealthCheck(instanceID, serviceName); err != nil {
                log.Println("Failed to report healthy state: ", err.Error())
            }
            time.Sleep(1 * time.Second)
        }
    }()
    defer registery.Deregister(ctx, instanceID, serviceName)

    log.Printf("Starting the movie service at port: %d\n", port)
    metadataGateway := metadataGateway.New(registery)
    ratingGateway := ratingGateway.New(registery)
    ctrl := movie.New(ratingGateway, metadataGateway)

    // New Code

    h := grpcHanlder.New(ctrl)
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", port))
    if err != nil {
        log.Fatal("failed to listen: err")
    }

    srv := grpc.NewServer()
    reflection.Register(srv)
    gen.RegisterMovieServiceServer(srv, h)
    if err := srv.Serve(lis); err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Note: We also registered our server with the reflection package. This allows us to use API request tools(eg: Postman) and get information about the available methods.

We've now completed the coding part. We'll now be testing our implementation.

Testing

Run the main.go in each directory. Also remember to run the consul docker container before that. I added the command to a file docker-compose in the root directory.

services:
  dev-consul:
    image: hashicorp/consul
    command: agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0
    ports:
      - 8500:8500
      - 8600:8600/udp
Enter fullscreen mode Exit fullscreen mode

Also added a Makefile

consul:
    @docker compose up -d
Enter fullscreen mode Exit fullscreen mode

You can now just run make consul and your consul service will be up.

Now after you've started all the services. Open the Postman app. Select a new request and select grpc request type.

Postman grpc

Final Request can be:

Request image

On sending the request you'll be seeing the following:

gRPC response

We can see that the response is correct. We are sending the Not Found code as the response as we'd defined in our code.

Tadaaaaa. We've completed this part!!! This was a challenging part for me personally. Probably due to many things that I saw for the first time.
I also felt that JSON should be used if you're not thinking about scaling and implementing some service which is not going to have very high traffic. JSON is trivial and is easy to implement and debug.
But Protobufs give a high advantage over JSON when optimizing is important.

Here's the github repo: Github

Top comments (0)