Welcome back to the gRPC course!
To recall, there are 4 types of gRPC. In the previous lectures, we've learned how to implement unary RPC and server-streaming RPC in Golang.
Today we will learn how to implement and test the 3rd type of gRPC, which is client-streaming. Specifically, we will build an API to upload an image file to the server in multiple chunks.
Here's the link to the full gRPC course playlist on Youtube
Github repository: pcbook-go and pcbook-java
Gitlab repository: pcbook-go and pcbook-java
1. Define client-streaming RPC in proto file
Let's define the new RPC in the laptop_service.proto
file.
First, we need an UploadImageRequest
message:
message UploadImageRequest {
oneof data {
ImageInfo info = 1;
bytes chunk_data = 2;
};
}
The idea is to divide the image file into multiple chunks, and send them one by one to the server in each request message. I use a oneof
field here because the first request will only contain the metadata, or some basic information of the image. Then the following requests will contain the image data chunks.
The ImageInfo
message will have 2 string fields: the laptop ID and the image type, such as ".jpg" or ".png".
message ImageInfo {
string laptop_id = 1;
string image_type = 2;
}
Then we define an UploadImageResponse
message, which will be returned to the client once the server has received all chunks of the image:
message UploadImageResponse {
string id = 1;
uint32 size = 2;
}
This message contains the ID of the image which is generated by the server, and the total size of the uploaded image in bytes.
OK, now we define the UploadImage
RPC. This is a client-streaming RPC, so it takes a stream of UploadImageRequest
as input And returns 1 single UploadImageResponse
.
service LaptopService {
...
rpc UploadImage(stream UploadImageRequest) returns (UploadImageResponse) {};
}
Alright, now let’s run make
gen to generate codes.
After the code is successfully generated, we will see some errors in the code because the laptop server hasn’t implemented the UploadImage()
method that is required by the LaptopServiceServer
interface.
The errors will be fixed after we implement the server.
2. Implement the server
Let’s open the laptop_server.go
file and add the UploadImage()
function to the LaptopServer
struct. We can easily find its signature inside the generated laptop_service.pb.go
file. Just copy and paste it to the laptop_server.go
file:
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
return nil
}
Let’s return nil
for now. We will come back to this function later after implementing the image store.
2.1 Implement the image store
The role of the store is to save the uploaded image file somewhere on the server or on the cloud. To make it more generic and easy to change to different types of storage, we define ImageStore
as an interface.
type ImageStore interface {
Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error)
}
It has 1 function to save a laptop image, which takes 3 input parameters: the laptop ID, the image type, and the image data given by a bytes buffer. And it will return the ID of the saved image, or an error.
Next we will implement this interface with a DiskImageStore
, which will save image files to the disk, and store its information in memory.
type DiskImageStore struct {
mutex sync.RWMutex
imageFolder string
images map[string]*ImageInfo
}
Similar to the laptop store, we need a mutex to handle concurrency. Then we need the path of the folder to save laptop images. And finally a map with the key is image ID and the value is some information of the image.
The ImageInfo contains 3 fields: the ID of the laptop, the type of the image (or its file extension: jpg/png), and the path to the image file on disk.
type ImageInfo struct {
LaptopID string
Type string
Path string
}
Let’s write a function to create a new DiskImageStore
. It has only 1 input, which is the image folder. And inside, we just need to initialize the images
map:
func NewDiskImageStore(imageFolder string) *DiskImageStore {
return &DiskImageStore{
imageFolder: imageFolder,
images: make(map[string]*ImageInfo),
}
}
Now we have to implement the Save()
function, which is required by the ImageStore
interface.
First we have to generate a new random UUID for the image. We make the path to store the image by joining the image folder, image ID, and image type.
Then we call os.Create()
to create the file. And we call imageData.WriteTo()
to write the image data to the created file
func (store *DiskImageStore) Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error) {
imageID, err := uuid.NewRandom()
if err != nil {
return "", fmt.Errorf("cannot generate image id: %w", err)
}
imagePath := fmt.Sprintf("%s/%s%s", store.imageFolder, imageID, imageType)
file, err := os.Create(imagePath)
if err != nil {
return "", fmt.Errorf("cannot create image file: %w", err)
}
_, err = imageData.WriteTo(file)
if err != nil {
return "", fmt.Errorf("cannot write image to file: %w", err)
}
store.mutex.Lock()
defer store.mutex.Unlock()
store.images[imageID.String()] = &ImageInfo{
LaptopID: laptopID,
Type: imageType,
Path: imagePath,
}
return imageID.String(), nil
}
If the file is written successfully, we need to save its information to the in-memory map. So we have to acquire the write lock of the store.
Then we save the image information to the map with key is the ID of the image, and the value contains the laptop ID, the image type, and the path to the image file.
Finally we return the image ID to the caller. And that’s it, we’re done with the image store. Now let’s go back to the server.
2.2 Implement the UploadImage RPC
We need to add the new image store to the LaptopServer
struct, and add the imageStore
as the second parameter of the NewLaptopServer()
function:
type LaptopServer struct {
laptopStore LaptopStore
imageStore ImageStore
}
func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore) *LaptopServer {
return &LaptopServer{laptopStore, imageStore}
}
Then in the main.go
file of the server, we also need to pass 2 stores into the NewLaptopServer
function: one is the laptop store, and the other is the image store. Let's say we will save the uploaded images to "img" folder.
func main() {
...
laptopStore := service.NewInMemoryLaptopStore()
imageStore := service.NewDiskImageStore("img")
laptopServer := service.NewLaptopServer(laptopStore, imageStore)
...
}
Now let’s implement the UploadImage()
function.
First we call stream.Recv()
to receive the first request, which contains the metadata information of the image. If there’s an error, we write a log and return the status code Unknown
to the client.
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
req, err := stream.Recv()
if err != nil {
return logError(status.Errorf(codes.Unknown, "cannot receive image info"))
}
...
}
func logError(err error) error {
if err != nil {
log.Print(err)
}
return err
}
To be concise and not repeat my self, here I define a logError()
function to log the error before returning it. It only prints log if the error is not nil, and always returns the error to the caller.
Next we can get the laptop ID and the image type from the request. Let’s write a log here saying that we have received the upload-image request with this laptop ID and image type.
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
laptopID := req.GetInfo().GetLaptopId()
imageType := req.GetInfo().GetImageType()
log.Printf("receive an upload-image request for laptop %s with image type %s", laptopID, imageType)
...
}
Before saving the laptop image, we have to make sure that the laptop ID really exists. So we call server.laptopStore.Find()
to find the laptop by ID.
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
laptop, err := server.laptopStore.Find(laptopID)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot find laptop: %v", err))
}
if laptop == nil {
return logError(status.Errorf(codes.InvalidArgument, "laptop id %s doesn't exist", laptopID))
}
...
}
If we get an error, just log and return it with the Internal
status code. Else, if the laptop is nil, which means it is not found, we log and return an error status code InvalidArgument
, or you might use code NotFound
to be more precise.
Now if everything goes well and the laptop is found, we can start receiving the image chunks data. So let’s create a new byte buffer to store them, and also a variable to keep track of the total image size.
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
imageData := bytes.Buffer{}
imageSize := 0
for {
log.Print("waiting to receive more data")
req, err := stream.Recv()
if err == io.EOF {
log.Print("no more data")
break
}
if err != nil {
return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
}
...
}
...
}
Since we’re going to receive many requests from the stream, I use a for loop here. And inside the loop, we call stream.Recv()
to get the request.
But this time, we first check if the error is EOF
or not. If it is, this means that no more data will be sent, and we can safely break the loop. Else, if the error is still not nil, we return it with Unknown
status code to the client.
Otherwise, if there’s no error, we can get the chunk data from the request. We get its size using the len()
function, and add this size to the total image size.
Let’s say we don’t want the client to send too large image, so we check if the image size is greater than the maximum size, let's say 1 MB as defined by the constant maxImageSize
(1 MB = 2^20 bytes = 1 << 20 bytes).
const maxImageSize = 1 << 20
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
imageData := bytes.Buffer{}
imageSize := 0
for {
...
chunk := req.GetChunkData()
size := len(chunk)
log.Printf("received a chunk with size: %d", size)
imageSize += size
if imageSize > maxImageSize {
return logError(status.Errorf(codes.InvalidArgument, "image is too large: %d > %d", imageSize, maxImageSize))
}
_, err = imageData.Write(chunk)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
}
}
...
}
Now if the image size is greater than max image size, we can return an error with InvalidArgument
status code, and a message saying the image is too large. Else, we can append the chunk to the image data with the Write()
function. Also log and return Internal
status code if an error occurs.
After the for loop, we have collected all data of the image in the buffer. So we can call imageStore.Save()
to save the image data to the store and get back the image ID:
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
for {
...
}
imageID, err := server.imageStore.Save(laptopID, imageType, imageData)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot save image to the store: %v", err))
}
res := &pb.UploadImageResponse{
Id: imageID,
Size: uint32(imageSize),
}
err = stream.SendAndClose(res)
if err != nil {
return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
}
log.Printf("saved image with id: %s, size: %d", imageID, imageSize)
return nil
}
If the image is saved successfully, we create a response object with the image ID and image size. Then we call stream.SendAndClose()
to send the response to client.
And finally we can write a log saying that the image is successfully saved with this ID and size.
Then we’re done with the server. Now let’s implement the client.
3. Implement the client
First I will refactor the code that we've written in the previous lectures a bit. Let’s add laptop as a parameter of this createLaptop()
function.
func createLaptop(laptopClient pb.LaptopServiceClient, laptop *pb.Laptop) {
...
}
Then create separate functions to test the create laptop RPC & search laptop RPC:
func testCreateLaptop(laptopClient pb.LaptopServiceClient) {
createLaptop(laptopClient, sample.NewLaptop())
}
func testSearchLaptop(laptopClient pb.LaptopServiceClient) {
...
}
Now we will write a new function to test the upload image RPC and call it from the main function:
func testUploadImage(laptopClient pb.LaptopServiceClient) {
laptop := sample.NewLaptop()
createLaptop(laptopClient, laptop)
uploadImage(laptopClient, laptop.GetId(), "tmp/laptop.jpg")
}
func main() {
serverAddress := flag.String("address", "", "the server address")
flag.Parse()
log.Printf("dial server %s", *serverAddress)
conn, err := grpc.Dial(*serverAddress, grpc.WithInsecure())
if err != nil {
log.Fatal("cannot dial server: ", err)
}
laptopClient := pb.NewLaptopServiceClient(conn)
testUploadImage(laptopClient)
}
In the testUploadImage()
function, we first generate a random laptop, and call createLaptop()
function to create it on the server.
Then we will write the uploadImage()
function to upload an image of this laptop to the server. This function has 3 input parameters: the laptop client, the laptop ID, and the path to the laptop image.
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
file, err := os.Open(imagePath)
if err != nil {
log.Fatal("cannot open image file: ", err)
}
defer file.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := laptopClient.UploadImage(ctx)
if err != nil {
log.Fatal("cannot upload image: ", err)
}
...
}
First we call os.Open()
to open the image file. If there’s an error, we write a fatal log. Else, we use defer()
to close the file afterward.
Then we create a context with timeout of 5 seconds, and call laptopClient.UploadImage()
with that context. It will return a stream object, and an error. If error is not nil, we write a fatal log.
Otherwise, we create the first request to send some image information to the server, which includes the laptop ID, and the image type, or the extension of the image file:
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
...
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_Info{
Info: &pb.ImageInfo{
LaptopId: laptopID,
ImageType: filepath.Ext(imagePath),
},
},
}
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send image info to server: ", err, stream.RecvMsg(nil))
}
...
}
After that, we call stream.Send()
to send the first request to the server. If we get an error, write a fatal log.
Else, we will create a buffer reader to read the content of the image file in chunks. Let’s say each chunk will be 1 KB, or 1024 bytes. We will read the image data chunks sequentially in a for loop:
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
...
reader := bufio.NewReader(file)
buffer := make([]byte, 1024)
for {
n, err := reader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Fatal("cannot read chunk to buffer: ", err)
}
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_ChunkData{
ChunkData: buffer[:n],
},
}
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send chunk to server: ", err)
}
}
...
}
First we call reader.Read()
to read the data to the buffer. It will return the number of bytes read and an error. If the error is EOF
, then it’s the end of the file, we simply break the loop. Else, if error is not nil
, we write a fatal log.
Otherwise, we create a new request with the chunk data. Make sure that the chunk only contains the first n bytes of the buffer, since the last chunk might contain less than 1024 bytes.
Then we call stream.Send()
to send it to the server. And again, write a fatal log if an error occurs.
Finally, after the for loop, We call stream.CloseAndRecv()
to receive a response from the server:
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
...
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatal("cannot receive response: ", err)
}
log.Printf("image uploaded with id: %s, size: %d", res.GetId(), res.GetSize())
...
}
If there's no error, we write a log saying that the image is successfully uploaded, and the server returns this ID and size.
And that’s it. The client is done. Let's put the laptop.jpg
file to the tmp
folder and run the server and client:
We got an error: cannot send chunk to server: EOF. This error message is not very useful since it doesn’t tell us exactly why.
But we know that the message comes from this log:
...
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send chunk to server: ", err)
}
...
The reason we got EOF is because when an error occurs, the server will close the stream, and thus the client cannot send more data to it.
To get the real error that contains the gRPC status code, we must call stream.RecvMsg()
with a nil parameter. The nil parameter basically means that we don't expect to receive any message, but we just want to get the error that function returns.
...
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send chunk to server: ", err, stream.RecvMsg(nil))
}
...
Now if we rerun the client, we can see that the real error is InvalidArgument
, laptop doesn’t exist.
And it's because the laptop ID is empty as we set in the createLaptop()
function in previous lecture:
So let’s remove this line and rerun the client.
This time it works. The image is uploaded successfully. If we open the img folder, we can see the laptop image is saved there:
Excellent!
OK, now let’s see what happens if timeout occurs. Suppose that somehow the server is writing the data very slowly. Here I sleep 1 second before writing the chunk to the buffer.
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
for {
...
// write slowly
time.Sleep(time.Second)
_, err = imageData.Write(chunk)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
}
}
...
}
Let’s try it.
After 5 seconds, we see an error log on the server. However the status code is Unknown
, and it also contains other DeadlineExceeded
error, which is not very nice.
So let’s fix this by checking the context error on server side before calling receive on the stream:
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
for {
err := contextError(stream.Context())
if err != nil {
return err
}
log.Print("waiting to receive more data")
req, err := stream.Recv()
if err == io.EOF {
log.Print("no more data")
break
}
...
}
...
}
I have extracted the context error checking block from the create laptop RPC, and make it a separate function:
func contextError(ctx context.Context) error {
switch ctx.Err() {
case context.Canceled:
return logError(status.Error(codes.Canceled, "request is canceled"))
case context.DeadlineExceeded:
return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
default:
return nil
}
}
Here we use switch case make it more concise and easier to read:
- In case the context error is
Canceled
, we log it and return the error withCanceled
status code. - In case
DeadlineExceeded
, we do the same thing, but withDeadlineExceeded
status code. - And for default case, we just return nil.
Alright, let’s rerun the server and the client.
Now on the server side, we see a better error log with status code DeadLineExceeded
. Perfect!
Let’s try another case where the upload image is larger than the maximum allowed size. I will change the max file size constraint to 1 KB instead of 1 MB.
const maxImageSize = 1 << 10
Then rerun the server and the client.
This time we got InvalidArgument
: image is too large. And on the server side, it only receives 2 data chunks before the same error log is printed. So it works!
Now let’s learn how to write test for this client-streaming RPC.
4. Write unit test
For this test, I'm gonna use tmp
as the folder to store images. The first thing we need to do is to create a new in-memory laptop store, and create a new disk image store with the tmp image folder.
func TestClientUploadImage(t *testing.T) {
t.Parallel()
testImageFolder := "../tmp"
laptopStore := service.NewInMemoryLaptopStore()
imageStore := service.NewDiskImageStore(testImageFolder)
laptop := sample.NewLaptop()
err := laptopStore.Save(laptop)
require.NoError(t, err)
serverAddress := startTestLaptopServer(t, laptopStore, imageStore, nil)
laptopClient := newTestLaptopClient(t, serverAddress)
...
}
We generate a sample laptop, and save it to the laptop store. Then we start the test server and make a new laptop client.
The image we’re gonna upload is the laptop.jpg
file inside the tmp folder. So we open the file, check that there’s no error, and defer closing it. Then we call laptopClient.UploadImage()
to get the stream and aequire that no error should occur.
func TestClientUploadImage(t *testing.T) {
...
imagePath := fmt.Sprintf("%s/laptop.jpg", testImageFolder)
file, err := os.Open(imagePath)
require.NoError(t, err)
defer file.Close()
stream, err := laptopClient.UploadImage(context.Background())
require.NoError(t, err)
...
}
Actually the rest of the test is very similar to what we’ve done in the client/main.go
file. We send the first request that contains only the metadata of the laptop image.
func TestClientUploadImage(t *testing.T) {
...
imageType := filepath.Ext(imagePath)
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_Info{
Info: &pb.ImageInfo{
LaptopId: laptop.GetId(),
ImageType: imageType,
},
},
}
err = stream.Send(req)
require.NoError(t, err)
...
}
Then we use a for loop to send the content of the image files in chunks:
func TestClientUploadImage(t *testing.T) {
...
reader := bufio.NewReader(file)
buffer := make([]byte, 1024)
size := 0
for {
n, err := reader.Read(buffer)
if err == io.EOF {
break
}
require.NoError(t, err)
size += n
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_ChunkData{
ChunkData: buffer[:n],
},
}
err = stream.Send(req)
require.NoError(t, err)
}
...
We also want to keep track of the total image size, So I define a size
variable for that. And everytime we read a new data block, we add n to the size.
Last step, we call stream.CloseAndRecv()
to get the response from the server, and we that the returned ID should not be a zero-value, and that the value of the returned image size should equal to size
.
func TestClientUploadImage(t *testing.T) {
...
res, err := stream.CloseAndRecv()
require.NoError(t, err)
require.NotZero(t, res.GetId())
require.EqualValues(t, size, res.GetSize())
...
}
We also want to check that the image is saved to the correct folder on the server. It should be inside the test image folder, with file name is the image ID and file extension is the image type. We can use require.FileExists()
function to check that.
func TestClientUploadImage(t *testing.T) {
...
savedImagePath := fmt.Sprintf("%s/%s%s", testImageFolder, res.GetId(), imageType)
require.FileExists(t, savedImagePath)
require.NoError(t, os.Remove(savedImagePath))
}
And finally we need to remove the file at the end of the test.
Alright, let’s run it.
It passed!
And that’s it for today’s lecture about client-streaming RPC. Thank you for reading and I will see you in the next article!
If you like the article, please subscribe to our Youtube channel and follow us on Twitter for more tutorials in the future.
If you want to join me on my current amazing team at Voodoo, check out our job openings here. Remote or onsite in Paris/Amsterdam/London/Berlin/Barcelona with visa sponsorship.
Top comments (0)