In the previous lecture, we've learned how to implement and test unary gRPC API in Go. Today we will learn how to implement the 2nd type of gRPC, which is server-streaming.
Here's the link to the full gRPC course playlist on Youtube
Github repository: pcbook-go and pcbook-java
Gitlab repository: pcbook-go and pcbook-java
First we will define a new server-streaming RPC in the proto file to search for laptops with some specific requirements. Then we will implement the server, the client, and write unit test for it.
Alright let’s start!
Add server-streaming RPC definition to Protobuf
Our RPC will allow us to search for laptops that satisfy some configuration requirements. So I will create a filter_message.proto
file.
syntax = "proto3";
package techschool.pcbook;
option go_package = "pb";
option java_package = "com.gitlab.techschool.pcbook.pb";
option java_multiple_files = true;
import "memory_message.proto";
message Filter {
double max_price_usd = 1;
uint32 min_cpu_cores = 2;
double min_cpu_ghz = 3;
Memory min_ram = 4;
}
This message will define what kind of laptop we’re looking for, such as:
- The maximum price that we’re willing to pay for the laptop.
- The minimum number of cores that the laptop CPU should have.
- The minimum frequency of the CPU.
- And the minimum size of the RAM.
Then we will define the new server-streaming RPC in the laptop_service.proto
file.
We define the SearchLaptopRequest
that contains only 1 Filter
field, and a SearchLaptopResponse
that contains only 1 Laptop
field.
message SearchLaptopRequest {
Filter filter = 1;
}
message SearchLaptopResponse {
Laptop laptop = 1;
}
The server-streaming RPC is defined in a similar way to the unary RPC. Start with the rpc keyword, then the RPC name is SearchLaptop
.The input is SearchLaptopRequest
, and the output is a stream of SearchLaptopResponse
.
service LaptopService {
rpc CreateLaptop(CreateLaptopRequest) returns (CreateLaptopResponse) {};
rpc SearchLaptop(SearchLaptopRequest) returns (stream SearchLaptopResponse) {};
}
And that’s it! Pretty straight-forward.
Let’s generate the code:
make gen
In the laptop_service.pb.go
file, some new codes have been generated.
We have the SearchLaptopRequest
struct, the SearchLaptopResponse
struct.
type SearchLaptopRequest struct {
Filter *Filter `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type SearchLaptopResponse struct {
Laptop *Laptop `protobuf:"bytes,1,opt,name=laptop,proto3" json:"laptop,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
Then the LaptopServiceClient
interface with a new SearchLaptop()
function.
type LaptopServiceClient interface {
CreateLaptop(ctx context.Context, in *CreateLaptopRequest, opts ...grpc.CallOption) (*CreateLaptopResponse, error)
SearchLaptop(ctx context.Context, in *SearchLaptopRequest, opts ...grpc.CallOption) (LaptopService_SearchLaptopClient, error)
}
Similarly we also have a new SearchLaptop()
function inside the LaptopServiceServer
interface.
type LaptopServiceServer interface {
CreateLaptop(context.Context, *CreateLaptopRequest) (*CreateLaptopResponse, error)
SearchLaptop(*SearchLaptopRequest, LaptopService_SearchLaptopServer) error
}
Add search function to the in-memory store
Before implementing the server, let’s add a new Search()
function to the LaptopStore
interface.
It takes a filter as input, and also a callback function to report whenever a laptop is found.
type LaptopStore interface {
Save(laptop *pb.Laptop) error
Find(id string) (*pb.Laptop, error)
Search(ctx context.Context, filter *pb.Filter, found func(laptop *pb.Laptop) error) error
}
The context is used to control the deadline/timeout of the request. We will see how it work in a moment.
Now we should implement this Search()
function for the InMemoryLaptopStore
.
Since we’re reading data, we have to acquire a read lock, and unlock it afterward. We iterate through all laptops in the store, and check which one is qualified to the filter.
// Search searches for laptops with filter, returns one by one via the found function
func (store *InMemoryLaptopStore) Search(
ctx context.Context,
filter *pb.Filter,
found func(laptop *pb.Laptop) error,
) error {
store.mutex.RLock()
defer store.mutex.RUnlock()
for _, laptop := range store.data {
if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded {
log.Print("context is cancelled")
return nil
}
if isQualified(filter, laptop) {
other, err := deepCopy(laptop)
if err != nil {
return err
}
err = found(other)
if err != nil {
return err
}
}
}
return nil
}
In the for loop, before checking if a laptop is qualified or not, we check if the context error is Cancelled
or DeadlineExceeded
or not. If it is, we should return immediately because the request is either already timed out or cancelled by client, so it's just a waste of time to continue searching.
When the laptop is qualified, we have to deep-copy it before sending it to the caller via the callback function found()
.
The isQualified()
function takes a filter and a laptop as input, and returns true if the laptop satisfies the filter.
func isQualified(filter *pb.Filter, laptop *pb.Laptop) bool {
if laptop.GetPriceUsd() > filter.GetMaxPriceUsd() {
return false
}
if laptop.GetCpu().GetNumberCores() < filter.GetMinCpuCores() {
return false
}
if laptop.GetCpu().GetMinGhz() < filter.GetMinCpuGhz() {
return false
}
if toBit(laptop.GetRam()) < toBit(filter.GetMinRam()) {
return false
}
return true
}
Since there are different types of memory units, to compare the RAM, we have to write a function to convert its value to the smallest unit: BIT.
func toBit(memory *pb.Memory) uint64 {
value := memory.GetValue()
switch memory.GetUnit() {
case pb.Memory_BIT:
return value
case pb.Memory_BYTE:
return value << 3 // 8 = 2^3
case pb.Memory_KILOBYTE:
return value << 13 // 1024 * 8 = 2^10 * 2^3 = 2^13
case pb.Memory_MEGABYTE:
return value << 23
case pb.Memory_GIGABYTE:
return value << 33
case pb.Memory_TERABYTE:
return value << 43
default:
return 0
}
}
First we get the memory value. Then we do a switch-case on the memory unit:
- If it is
BIT
, we simply return the value. - If it is
BYTE
, we have to multiply the value by 8 because1 BYTE = 8 BITs
. And because8 = 2^3
, we can use a bit-operator shift-left 3 here. - If it is
KILOBYTE
, we have to multiply the value by1024 * 8
because1 KILOBYTE = 1024 BYTEs
. And because1024 * 8 = 2^13
, we can use a simple shift-left 13 here. - Similarly, if it is
MEGABYTE
, we return value shift-left 23. - For
GIGABYTE
, value shift-left 33 - And finally for
TERABYTE
, value shift-left 43. - For the default case, just return 0.
OK, the store is done. Now let’s implement the server!
Implement the server
We will have to implement the SearchLaptop()
function of the LaptopServiceServer
interface. It has 2 arguments: the input request and the output stream response.
// SearchLaptop is a server-streaming RPC to search for laptops
func (server *LaptopServer) SearchLaptop(
req *pb.SearchLaptopRequest,
stream pb.LaptopService_SearchLaptopServer,
) error {
filter := req.GetFilter()
log.Printf("receive a search-laptop request with filter: %v", filter)
err := server.laptopStore.Search(
stream.Context(),
filter,
func(laptop *pb.Laptop) error {
res := &pb.SearchLaptopResponse{Laptop: laptop}
err := stream.Send(res)
if err != nil {
return err
}
log.Printf("sent laptop with id: %s", laptop.GetId())
return nil
},
)
if err != nil {
return status.Errorf(codes.Internal, "unexpected error: %v", err)
}
return nil
}
The first thing we do is to get the filter from the request. Then we call server.Store.Search()
, pass in the stream context, the filter, and the callback function.
If an error occurs, we return it with the Internal
status code, else we return nil.
In the callback function, when we found a laptop, we create a new response object with that laptop and send it to the client by calling stream.Send()
.
If an error occurs, just return it. Otherwise, we write a simple log saying we have sent the laptop with this ID then return nil.
And we’re done with the server. Now let’s implement the client!
Implement the client
First I will split the codes to create a random laptop that we've written in the previous lecture to a separate function:
func createLaptop(laptopClient pb.LaptopServiceClient) {
laptop := sample.NewLaptop()
laptop.Id = ""
req := &pb.CreateLaptopRequest{
Laptop: laptop,
}
// set timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := laptopClient.CreateLaptop(ctx, req)
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.AlreadyExists {
// not a big deal
log.Print("laptop already exists")
} else {
log.Fatal("cannot create laptop: ", err)
}
return
}
log.Printf("created laptop with id: %s", res.Id)
}
Then in the main function, we will use a for loop to create 10 random laptops.
func main() {
serverAddress := flag.String("address", "", "the server address")
flag.Parse()
log.Printf("dial server %s", *serverAddress)
conn, err := grpc.Dial(*serverAddress, grpc.WithInsecure())
if err != nil {
log.Fatal("cannot dial server: ", err)
}
laptopClient := pb.NewLaptopServiceClient(conn)
for i := 0; i < 10; i++ {
createLaptop(laptopClient)
}
filter := &pb.Filter{
MaxPriceUsd: 3000,
MinCpuCores: 4,
MinCpuGhz: 2.5,
MinRam: &pb.Memory{Value: 8, Unit: pb.Memory_GIGABYTE},
}
searchLaptop(laptopClient, filter)
}
Then we create a new search filter. Suppose I want to search for laptops with:
- Maximum price of 3000
- At least 4 CPU cores
- Minimum frequency of 2.5 Ghz
- And at least 8 gigabytes of RAM
After that, we call searchLaptop()
with the this filter. Let’s implement this function!
func searchLaptop(laptopClient pb.LaptopServiceClient, filter *pb.Filter) {
log.Print("search filter: ", filter)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &pb.SearchLaptopRequest{Filter: filter}
stream, err := laptopClient.SearchLaptop(ctx, req)
if err != nil {
log.Fatal("cannot search laptop: ", err)
}
for {
res, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatal("cannot receive response: ", err)
}
laptop := res.GetLaptop()
log.Print("- found: ", laptop.GetId())
log.Print(" + brand: ", laptop.GetBrand())
log.Print(" + name: ", laptop.GetName())
log.Print(" + cpu cores: ", laptop.GetCpu().GetNumberCores())
log.Print(" + cpu min ghz: ", laptop.GetCpu().GetMinGhz())
log.Print(" + ram: ", laptop.GetRam())
log.Print(" + price: ", laptop.GetPriceUsd())
}
}
We first create a context with timeout of 5 seconds. We make a SearchLaptopRequest
object with the input filter. Then we call laptopClient.SearchLaptop()
to get the stream.
If there’s an error, write a fatal log. Or else, we use a for loop to receive multiple responses from the stream.
If the stream.Recv()
function call returns an end-of-file (EOF) error, this means it’s the end of the stream, so we just return. Otherwise, if error is not nil, we write a fatal log.
If everything goes well, we can get the laptop from the stream. Here I print out only a few properties of the laptop so that it’s easier to read.
Write unit test
Now I will show you how to write unit tests for this server-streaming RPC.
First I will create a search filter and an in-memory laptop store to insert some laptops for searching.
func TestClientSearchLaptop(t *testing.T) {
t.Parallel()
filter := &pb.Filter{
MaxPriceUsd: 2000,
MinCpuCores: 4,
MinCpuGhz: 2.2,
MinRam: &pb.Memory{Value: 8, Unit: pb.Memory_GIGABYTE},
}
store := service.NewInMemoryLaptopStore()
...
}
Then I make an expectedIDs
map that will contain all laptop IDs that we expect to be found by the server.
We use a for loop to create 6 laptops:
- Case 0: unmatched laptop with a too high price.
- Case 1: unmatched because it has only 2 cores.
- Case 2: doesn’t match because the min frequency is too low.
- Case 3: doesn’t match since it has only 4 GB of RAM.
- Case 4 + 5: matched.
func TestClientSearchLaptop(t *testing.T) {
...
expectedIDs := make(map[string]bool)
for i := 0; i < 6; i++ {
laptop := sample.NewLaptop()
switch i {
case 0:
laptop.PriceUsd = 2500
case 1:
laptop.Cpu.NumberCores = 2
case 2:
laptop.Cpu.MinGhz = 2.0
case 3:
laptop.Ram = &pb.Memory{Value: 4096, Unit: pb.Memory_MEGABYTE}
case 4:
laptop.PriceUsd = 1999
laptop.Cpu.NumberCores = 4
laptop.Cpu.MinGhz = 2.5
laptop.Cpu.MaxGhz = laptop.Cpu.MinGhz + 2.0
laptop.Ram = &pb.Memory{Value: 16, Unit: pb.Memory_GIGABYTE}
expectedIDs[laptop.Id] = true
case 5:
laptop.PriceUsd = 2000
laptop.Cpu.NumberCores = 6
laptop.Cpu.MinGhz = 2.8
laptop.Cpu.MaxGhz = laptop.Cpu.MinGhz + 2.0
laptop.Ram = &pb.Memory{Value: 64, Unit: pb.Memory_GIGABYTE}
expectedIDs[laptop.Id] = true
}
err := store.Save(laptop)
require.NoError(t, err)
}
...
}
Then we call store.Save()
to save the laptop to the store, and require that there’s no error returned.
Next we have to add this store to the test laptop server. So I will add one more store
parameter to the startTestLaptopServer
function that we've written in the previous lecture:
func startTestLaptopServer(t *testing.T, store service.LaptopStore) (*service.LaptopServer, string) {
laptopServer := service.NewLaptopServer(store)
grpcServer := grpc.NewServer()
pb.RegisterLaptopServiceServer(grpcServer, laptopServer)
listener, err := net.Listen("tcp", ":0") // random available port
require.NoError(t, err)
go grpcServer.Serve(listener)
return laptopServer, listener.Addr().String()
}
func newTestLaptopClient(t *testing.T, serverAddress string) pb.LaptopServiceClient {
conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
require.NoError(t, err)
return pb.NewLaptopServiceClient(conn)
}
Then call this function to start the test server, and create a laptop client object with that server address:
func TestClientSearchLaptop(t *testing.T) {
...
_, serverAddress := startTestLaptopServer(t, store)
laptopClient := newTestLaptopClient(t, serverAddress)
req := &pb.SearchLaptopRequest{Filter: filter}
stream, err := laptopClient.SearchLaptop(context.Background(), req)
require.NoError(t, err)
...
}
After that, we create a new SearchLaptopRequest
with the filter. Then we call laptopCient.SearchLaptop()
with the created request to get back the stream. There should be no errors returned.
Next, I will use the found
variable to keep track of the number of laptops found. Then use a for loop to receive multiple responses from the stream.
func TestClientSearchLaptop(t *testing.T) {
...
found := 0
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
require.NoError(t, err)
require.Contains(t, expectedIDs, res.GetLaptop().GetId())
found += 1
}
require.Equal(t, len(expectedIDs), found)
}
If we got an end-of-file error, then break. Else we check that there’s no error, and the laptop ID should be in the expectedIDs map.
Then we increase the number of laptops found. Finally we require that number to equal to the size of the expectedIDs.
OK now let’s run this unit test.
It passed.
And that’s all for today’s lecture. We have learned how to implement and test a server-streaming RPC in Go.
Thanks for reading and I will see you in the next article!
If you like the article, please subscribe to our Youtube channel and follow us on Twitter for more tutorials in the future.
If you want to join me on my current amazing team at Voodoo, check out our job openings here. Remote or onsite in Paris/Amsterdam/London/Berlin/Barcelona with visa sponsorship.
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.