DEV Community

Toran Sahu
Toran Sahu

Posted on • Updated on

Part-2: Building a basic microservice with unidirectional-streaming gRPC using Golang

Have you ever wondered while developing a REST API that if the server could have got the capability to stream responses using the same TCP connection? Or, reversely if the REST client could have got the capability to stream the requests to the server, this could have saved the cost of bringing up another service (like WebSocket) just for the sake of fulfilling such requirement.

Then REST isn’t the only API architecture available, and for such use-cases, the gRPC model has begun to play a crucial role. gRPC's unidirectional-streaming RPC feature has got your back on those requirements.

Objective

In this blog, you'll get to know what is client streaming & server streaming uni-directional RPCs. How to implement, test, and run them using a live, fully functional example.

Previously in the part-1 of this blog series, we've learned the basics of gRPC, how to implement a Simple/Unary gRPC, how to write unit tests, how to launch the server & client. part-1 walks you through a step-by-step guide to implement a Stack Machine server & client leveraging Simple/Unary RPC.

If you've missed that, it is highly recommended to go through it to get familiar with the basics of the gRPC framework.

Introduction

Let's understand how Client streaming & Server streaming RPCs works at a very high-level.

Client streaming RPCs where:

  • the client writes a sequence of messages and sends them to the server using a provided stream
  • once the client has finished writing the messages, it waits for the server to read them and return its response

Server streaming RPCs where:

  • the client sends a request to the server and gets a stream to read a sequence of messages back
  • the client reads from the returned stream until there are no more messages

The best thing is gRPC guarantees message ordering within an individual RPC call.

Now let's improve the "Stack Machine" server & client codes to support unidirectional streaming.

Implementing Server Streaming RPC

We'll see an example of Server Streaming first by implementing the FIB operation.

Where the FIB RPC will:

  • perform a Fibonacci operation
  • accept an integer input i.e. generate first N numbers of the Fibonacci series
  • will respond with a stream of integers i.e. first N numbers of the Fibonacci series

And later we'll see how Client Streaming can be implemented so that a client can input a stream of Instructions to the Stack Machine in real-time rather than sending a single request comprised of a set of Instructions.

Update the protobuf

We already have defined the gRPC service Machine and a Simple (Unary) RPC method Execute inside our service definition in part-1 of the blog series. Now, let's update the service definition to add one server streaming RPC called ServerStreamingExecute.

  • A server streaming RPC where the client sends a request to the server using the stub and waits for a response to come back as a stream of result
  • To specify a server-side streaming method, need to place the stream keyword before the response type
// ServerStreamingExecute accepts a set of Instructions from client and returns a stream of Result.
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
Enter fullscreen mode Exit fullscreen mode
source: machine/machine.proto

Generating the updated client and server interface Go code

We need to generate the gRPC client and server interfaces from our machine/machine.proto service definition.

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
  -I=$SRC_DIR \
  --go_out=plugins=grpc:$DST_DIR \
  $SRC_DIR/machine/machine.proto
Enter fullscreen mode Exit fullscreen mode

You can observe that the declaration of ServerStreamingExecute() in the MachineClient and MachineServer interface has been auto-generated:

...

 type MachineClient interface {
    Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+   ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
 }

...

 type MachineServer interface {
    Execute(context.Context, *InstructionSet) (*Result, error)
+   ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
 }
Enter fullscreen mode Exit fullscreen mode
source: machine/machine.pb.go

Update the Server

Just in case if you're wondering, What if my service doesn't implement some of the RPCs declared in the machine.pb.go file, then you'll encounter the following error while launching your gRPC server.

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
# command-line-arguments
cmd/run_machine_server.go:32:44: cannot use &server.MachineServer literal (type *server.MachineServer) as type machine.MachineServer in argument to machine.RegisterMachineServer:
        *server.MachineServer does not implement machine.MachineServer (missing ServerStreamingExecute method)

Enter fullscreen mode Exit fullscreen mode

So, it's always the best practice to keep your service in sync with the service definition i.e. machine/machine.proto & machine/machine.pb.go. If you do not want to support a particular RPC, or its implementation is not yet ready, just respond with Unimplemented error status. Example:

// ServerStreamingExecute runs the set of instructions given and streams a sequence of Results.
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
    return status.Error(codes.Unimplemented, "ServerStreamingExecute() not implemented yet")
}
Enter fullscreen mode Exit fullscreen mode
source: server/machine.go

Before we implement the ServerStreamingExecute() RPC, let's write a Fibonacci series generator called FibonacciRange().

package utils

func FibonacciRange(n int) <-chan int {
    ch := make(chan int)
    fn := make([]int, n+1, n+2)
    fn[0] = 0
    fn[1] = 1
    go func() {
        defer close(ch)
        for i := 0; i <= n; i++ {
            var f int
            if i < 2 {
                f = fn[i]
            } else {
                f = fn[i-1] + fn[i-2]
            }
            fn[i] = f
            ch <- f
        }
    }()
    return ch
}
Enter fullscreen mode Exit fullscreen mode
source: utils/fibonacci.go

The blog series assumes that you're familiar with Golang basics & its concurrency paradigms & concepts like Channels. You can read more about the Channels from the official document.

This function yields the numbers of Fibonacci series till the Nth position.

Let's also add a small unit test to validate the FibonacciRange() generator.

package utils

import (
    "testing"
)

func TestFibonacciRange(t *testing.T) {
    fibOf5 := []int{0, 1, 1, 2, 3, 5}
    i := 0
    for f := range FibonacciRange(5) {
        if f != fibOf5[i] {
            t.Errorf("got %d, want %d", f, fibOf5[i])
        }
        i++
    }
}
Enter fullscreen mode Exit fullscreen mode
source: utils/fibonacci_test.go

Let's implement ServerStreamingExecute() to handle the basic instructions PUSH/POP, and FIB with proper error handling. On completion of the execution of instructions set, it should POP the result from the Stack and should respond with a Result object to the client.

func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
    if len(instructions.GetInstructions()) == 0 {
        return status.Error(codes.InvalidArgument, "No valid instructions received")
    }

    var stack stack.Stack

    for _, instruction := range instructions.GetInstructions() {
        operand := instruction.GetOperand()
        operator := instruction.GetOperator()
        op_type := OperatorType(operator)

        log.Printf("Operand: %v, Operator: %v\n", operand, operator)

        switch op_type {
        case PUSH:
            stack.Push(float32(operand))
        case POP:
            stack.Pop()
        case FIB:
            n, popped := stack.Pop()

            if !popped {
                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
            }

            if op_type == FIB {
                for f := range utils.FibonacciRange(int(n)) {
                    log.Println(float32(f))
                    stream.Send(&machine.Result{Output: float32(f)})
                }
            }
        default:
            return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
        }
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode
source: server/machine.go

Update the Client

Now, update the client code to call ServerStreamingExecute() where the client will be receiving numbers of the Fibonacci series through the stream and print the same.

func runServerStreamingExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
    log.Printf("Executing %v", instructions)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.ServerStreamingExecute(ctx, instructions)
    if err != nil {
        log.Fatalf("%v.Execute(_) = _, %v: ", client, err)
    }
    for {
        result, err := stream.Recv()
        if err == io.EOF {
            log.Println("EOF")
            break
        }
        if err != nil {
            log.Printf("Err: %v", err)
            break
        }
        log.Printf("output: %v", result.GetOutput())
    }
    log.Println("DONE!")
}
Enter fullscreen mode Exit fullscreen mode
source: client/machine.go

Test

To write the unit test we'll need to generate the mock of multiple interface as required.
mockgen is the ready-to-go framework for mocking in Golang, so we'll be leveraging it in our unit tests.

Server

As we've upgdated our interface i.e. machine/machine.pb.go, let's update the mock for MachineClient interface. And as we've introduced a new RPC ServerStreamingExecute(), let's generate the mock for ServerStream interface Machine_ServerStreamingExecuteServer as well.

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer > mock_machine/machine_mock.go
Enter fullscreen mode Exit fullscreen mode

The updated mock_machine/machine_mock.go should look like this.

Now, we're good to write unit test for server-side streaming RPC ServerStreamingExecute():

func TestServerStreamingExecute(t *testing.T) {
    s := MachineServer{}

    // set up test table
    tests := []struct {
        instructions []*machine.Instruction
        want         []float32
    }{
        {
            instructions: []*machine.Instruction{
                {Operand: 5, Operator: "PUSH"},
                {Operator: "FIB"},
            },
            want: []float32{0, 1, 1, 2, 3, 5},
        },
        {
            instructions: []*machine.Instruction{
                {Operand: 6, Operator: "PUSH"},
                {Operator: "FIB"},
            },
            want: []float32{0, 1, 1, 2, 3, 5, 8},
        },
    }

    ctrl := gomock.NewController(t)
    defer ctrl.Finish()
    mockServerStream := mock_machine.NewMockMachine_ServerStreamingExecuteServer(ctrl)
    for _, tt := range tests {
        mockResults := []*machine.Result{}
        mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(
            func(result *machine.Result) error {
                mockResults = append(mockResults, result)
                return nil
            }).AnyTimes()

        req := &machine.InstructionSet{Instructions: tt.instructions}

        err := s.ServerStreamingExecute(req, mockServerStream)
        if err != nil {
            t.Errorf("ServerStreamingExecute(%v) got unexpected error: %v", req, err)
        }
        for i, result := range mockResults {
            got := result.GetOutput()
            want := tt.want[i]
            if got != want {
                t.Errorf("got %v, want %v", got, want)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
source: server/machine_test.go

Let's run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok      command-line-arguments  0.003s
Enter fullscreen mode Exit fullscreen mode

Client

For our new RPC ServerStreamingExecute(), let's add the mock for ClientStream interface Machine_ServerStreamingExecuteClient as well.

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer,Machine_ServerStreamingExecuteClient > mock_machine/machine_mock.go
Enter fullscreen mode Exit fullscreen mode
source: mock_machine/machine_mock.go

Let's add unit test to test client-side logic for server-side streaming RPC ServerStreamingExecute() using mock MockMachine_ServerStreamingExecuteClient :

func TestServerStreamingExecute(t *testing.T) {
    instructions := []*machine.Instruction{}
    instructions = append(instructions, &machine.Instruction{Operand: 1, Operator: "PUSH"})
    instructions = append(instructions, &machine.Instruction{Operator: "FIB"})
    instructionSet := &machine.InstructionSet{Instructions: instructions}

    ctrl := gomock.NewController(t)
    defer ctrl.Finish()
    mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
    clientStream := mock_machine.NewMockMachine_ServerStreamingExecuteClient(ctrl)

    clientStream.EXPECT().Recv().Return(&machine.Result{Output: 0}, nil)

    mockMachineClient.EXPECT().ServerStreamingExecute(
        gomock.Any(),   // context
        instructionSet, // rpc uniary message
    ).Return(clientStream, nil)

    if err := testServerStreamingExecute(t, mockMachineClient, instructionSet); err != nil {
        t.Fatalf("Test failed: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode
source: mock_machine/machine_mock_test.go

Let's run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok      command-line-arguments  0.003s
Enter fullscreen mode Exit fullscreen mode

Run

As we are assured through unit tests that the business logic of the server & client codes is working as expected, let’s try running the server and communicating to it via our client code.

Server

To start the server we need to run the previously created cmd/run_machine_server.go file.

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
Enter fullscreen mode Exit fullscreen mode

Client

Now, let’s run the client code client/machine.go.

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Executing instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!
Enter fullscreen mode Exit fullscreen mode

Awesome! A Server Streaming RPC has been successfully implemented.


Implementing Client Streaming RPC

We have learned how to implement a Server Streaming RPC, now it's time to explore the Client Streaming RPC.
To do so, we'll not introduce another RPC, rather we'll update the existing Execute() RPC to accept a stream of Instructions from the client in real-time rather than sending a single request comprised of a set of Instructions.

Update the protobuf

So, let's update the interface:

 service Machine {
-     rpc Execute(InstructionSet) returns (Result) {}
+     rpc Execute(stream Instruction) returns (Result) {}
      rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
 }
Enter fullscreen mode Exit fullscreen mode
source: machine/machine.proto

Generating the updated client and server interface Go code

Now lets generate an updated golang code from the machine/machine.proto by running:

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
  -I=$SRC_DIR \
  --go_out=plugins=grpc:$DST_DIR \
  $SRC_DIR/machine/machine.proto
Enter fullscreen mode Exit fullscreen mode

You'll notice that declaration of Execute() has been updated from MachineServer & MachineClient interfaces.

 type MachineServer interface {
-   Execute(context.Context, *InstructionSet) (*Result, error)
+   Execute(Machine_ExecuteServer) error
    ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
 }

 type MachineClient interface {
-    Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+    Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)
     ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
 }
Enter fullscreen mode Exit fullscreen mode
source: machine/machine.pb.go

Update the Server

Let's update the server code to make Execute() a client streaming uni-directional RPC so that it should be able to accept stream the instructions from the client and respond with a Result struct.

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {
    var stack stack.Stack
    for {
        instruction, err := stream.Recv()
        if err == io.EOF {
            log.Println("EOF")
            output, popped := stack.Pop()
            if !popped {
                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
            }

            if err := stream.SendAndClose(&machine.Result{
                Output: output,
            }); err != nil {
                return err
            }

            return nil
        }
        if err != nil {
            return err
        }

        operand := instruction.GetOperand()
        operator := instruction.GetOperator()
        op_type := OperatorType(operator)

        fmt.Printf("Operand: %v, Operator: %v\n", operand, operator)

        switch op_type {
        case PUSH:
            stack.Push(float32(operand))
        case POP:
            stack.Pop()
        case ADD, SUB, MUL, DIV:
            item2, popped := stack.Pop()
            item1, popped := stack.Pop()

            if !popped {
                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
            }

            if op_type == ADD {
                stack.Push(item1 + item2)
            } else if op_type == SUB {
                stack.Push(item1 - item2)
            } else if op_type == MUL {
                stack.Push(item1 * item2)
            } else if op_type == DIV {
                stack.Push(item1 / item2)
            }

        default:
            return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
source: server/machine.go

Update the Client

Now update the client code to make client.Execute() a uni-directional streaming RPC, so that the client can stream the instructions to the server and can receive a Result struct once the streaming completes.

func runExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
    log.Printf("Streaming %v", instructions)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.Execute(ctx)
    if err != nil {
        log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
    }
    for _, instruction := range instructions.GetInstructions() {
        if err := stream.Send(instruction); err != nil {
            log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
        }
    }
    result, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    }
    log.Println(result)
}
Enter fullscreen mode Exit fullscreen mode
source: client/machine.go

Test

Generate mock for Machine_ExecuteClient and Machine_ExecuteServer interface to test client-streaming RPC Execute():

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteClient,Machine_ServerStreamingExecuteServer,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go
Enter fullscreen mode Exit fullscreen mode

The updated mock_machine/machine_mock.go should look like this.

Server

Let's update the unit test to test the server-side logic of client streaming Execute() RPC using mock:

func TestExecute(t *testing.T) {
    s := MachineServer{}

    ctrl := gomock.NewController(t)
    defer ctrl.Finish()
    mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)

    mockResult := &machine.Result{}
    callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 5, Operator: "PUSH"}, nil)
    callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 6, Operator: "PUSH"}, nil).After(callRecv1)
    callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)
    mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv3)
    mockServerStream.EXPECT().SendAndClose(gomock.Any()).DoAndReturn(
        func(result *machine.Result) error {
            mockResult = result
            return nil
        })

    err := s.Execute(mockServerStream)
    if err != nil {
        t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)
    }
    got := mockResult.GetOutput()
    want := float32(30)
    if got != want {
        t.Errorf("got %v, wanted %v", got, want)
    }
}
Enter fullscreen mode Exit fullscreen mode
source: server/machine_test.go

Let's run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok      command-line-arguments  0.003s
Enter fullscreen mode Exit fullscreen mode

Client

Now, add unit test to test client-side logic of client streaming Execute() RPC using mock:

func TestExecute(t *testing.T) {
    ctrl := gomock.NewController(t)
    defer ctrl.Finish()
    mockMachineClient := mock_machine.NewMockMachineClient(ctrl)

    mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)
    mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
    mockClientStream.EXPECT().CloseAndRecv().Return(&machine.Result{Output: 30}, nil)

    mockMachineClient.EXPECT().Execute(
        gomock.Any(), // context
    ).Return(mockClientStream, nil)

    testExecute(t, mockMachineClient)
}
Enter fullscreen mode Exit fullscreen mode
source: mock_machine/machine_mock_test.go

Let's run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok      command-line-arguments  0.003s
Enter fullscreen mode Exit fullscreen mode

Run all the unit tests at once:

~/disk/E/workspace/grpc-eg-go
$ go test ./...
?       github.com/toransahu/grpc-eg-go/client  [no test files]
?       github.com/toransahu/grpc-eg-go/cmd     [no test files]
?       github.com/toransahu/grpc-eg-go/machine [no test files]
ok      github.com/toransahu/grpc-eg-go/mock_machine    (cached)
ok      github.com/toransahu/grpc-eg-go/server  (cached)
ok      github.com/toransahu/grpc-eg-go/utils   (cached)
?       github.com/toransahu/grpc-eg-go/utils/stack     [no test files]
Enter fullscreen mode Exit fullscreen mode

Run

Now we are assured through unit tests that the business logic of the server & client codes is working as expected, let’s try running the server and communicating to it via our client code.

Server

To launch the server we need to run the previously created cmd/run_machine_server.go file.

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
Enter fullscreen mode Exit fullscreen mode

Client

Now, let’s run the client code client/machine.go.

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Streaming instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!
Enter fullscreen mode Exit fullscreen mode

Awesome!! We have successfully transformed a Unary RPC into Server Streaming RPC.


At the end of this blog, we’ve learned:

  • How to define an interface for uni-directional streaming RPCs using protobuf
  • How to write gRPC server & client logic for uni-directional streaming RPCs
  • How to write and run the unit test for server-streaming & client-streaming RPCs
  • How to run the gRPC server and a client can communicate to it

The source code of this example is available at toransahu/grpc-eg-go.
You can also git checkout to this commit SHA for Part-2(a) and to this commit SHA for Part-2(b).

See you in the next part of this blog series.

Top comments (1)

Collapse
 
helicopterinhand profile image
Russell

Hi Toran, thanks so much for this tutorial!!

I am quite new to gRPC and have a few questions.

  1. For the ServerStreamingExecute function, let's say the stream.Send has an error and I want to handle it. Should I just log.Print it or should I return an error in the response?

  2. Also for the ServerStreamingExecute function, the second argument it accepts stream is a type interface rather than type struct. The interface is called
    Machine_ServerStreamingExecuteServer
    while the struct is called
    machineServerStreamingExecuteServer.
    Why do we have to use the type interface here?