loading...

Upload file in chunks with client-streaming gRPC - Go

techschoolguru profile image TECH SCHOOL Updated on ・15 min read

Welcome back to the gRPC course!

To recall, there are 4 types of gRPC. In the previous lectures, we've learned how to implement unary RPC and server-streaming RPC in Golang.

Today we will learn how to implement and test the 3rd type of gRPC, which is client-streaming. Specifically, we will build an API to upload an image file to the server in multiple chunks.

Here's the link to the full gRPC course playlist on Youtube
Github repository: pcbook-go and pcbook-java
Gitlab repository: pcbook-go and pcbook-java

1. Define client-streaming RPC in proto file

Let's define the new RPC in the laptop_service.proto file.

First, we need an UploadImageRequest message:

message UploadImageRequest {
  oneof data {
    ImageInfo info = 1;
    bytes chunk_data = 2;
  };
}

The idea is to divide the image file into multiple chunks, and send them one by one to the server in each request message. I use a oneof field here because the first request will only contain the metadata, or some basic information of the image. Then the following requests will contain the image data chunks.

The ImageInfo message will have 2 string fields: the laptop ID and the image type, such as ".jpg" or ".png".

message ImageInfo {
  string laptop_id = 1;
  string image_type = 2;
}

Then we define an UploadImageResponse message, which will be returned to the client once the server has received all chunks of the image:

message UploadImageResponse {
  string id = 1;
  uint32 size = 2;
}

This message contains the ID of the image which is generated by the server, and the total size of the uploaded image in bytes.

OK, now we define the UploadImage RPC. This is a client-streaming RPC, so it takes a stream of UploadImageRequest as input And returns 1 single UploadImageResponse.

service LaptopService {
  ...
  rpc UploadImage(stream UploadImageRequest) returns (UploadImageResponse) {};
}

Alright, now let’s run make gen to generate codes.

Generate codes

After the code is successfully generated, we will see some errors in the code because the laptop server hasn’t implemented the UploadImage() method that is required by the LaptopServiceServer interface.

Some errors

The errors will be fixed after we implement the server.

2. Implement the server

Let’s open the laptop_server.go file and add the UploadImage() function to the LaptopServer struct. We can easily find its signature inside the generated laptop_service.pb.go file. Just copy and paste it to the laptop_server.go file:

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    return nil
}

Let’s return nil for now. We will come back to this function later after implementing the image store.

2.1 Implement the image store

The role of the store is to save the uploaded image file somewhere on the server or on the cloud. To make it more generic and easy to change to different types of storage, we define ImageStore as an interface.

type ImageStore interface {
    Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error)
}

It has 1 function to save a laptop image, which takes 3 input parameters: the laptop ID, the image type, and the image data given by a bytes buffer. And it will return the ID of the saved image, or an error.

Next we will implement this interface with a DiskImageStore, which will save image files to the disk, and store its information in memory.

type DiskImageStore struct {
    mutex       sync.RWMutex
    imageFolder string
    images      map[string]*ImageInfo
}

Similar to the laptop store, we need a mutex to handle concurrency. Then we need the path of the folder to save laptop images. And finally a map with the key is image ID and the value is some information of the image.

The ImageInfo contains 3 fields: the ID of the laptop, the type of the image (or its file extension: jpg/png), and the path to the image file on disk.

type ImageInfo struct {
    LaptopID string
    Type     string
    Path     string
}

Let’s write a function to create a new DiskImageStore. It has only 1 input, which is the image folder. And inside, we just need to initialize the images map:

func NewDiskImageStore(imageFolder string) *DiskImageStore {
    return &DiskImageStore{
        imageFolder: imageFolder,
        images:      make(map[string]*ImageInfo),
    }
}

Now we have to implement the Save() function, which is required by the ImageStore interface.

First we have to generate a new random UUID for the image. We make the path to store the image by joining the image folder, image ID, and image type.

Then we call os.Create() to create the file. And we call imageData.WriteTo() to write the image data to the created file

func (store *DiskImageStore) Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error) {
    imageID, err := uuid.NewRandom()
    if err != nil {
        return "", fmt.Errorf("cannot generate image id: %w", err)
    }

    imagePath := fmt.Sprintf("%s/%s%s", store.imageFolder, imageID, imageType)

    file, err := os.Create(imagePath)
    if err != nil {
        return "", fmt.Errorf("cannot create image file: %w", err)
    }

    _, err = imageData.WriteTo(file)
    if err != nil {
        return "", fmt.Errorf("cannot write image to file: %w", err)
    }

    store.mutex.Lock()
    defer store.mutex.Unlock()

    store.images[imageID.String()] = &ImageInfo{
        LaptopID: laptopID,
        Type:     imageType,
        Path:     imagePath,
    }

    return imageID.String(), nil
}

If the file is written successfully, we need to save its information to the in-memory map. So we have to acquire the write lock of the store.

Then we save the image information to the map with key is the ID of the image, and the value contains the laptop ID, the image type, and the path to the image file.

Finally we return the image ID to the caller. And that’s it, we’re done with the image store. Now let’s go back to the server.

2.2 Implement the UploadImage RPC

We need to add the new image store to the LaptopServer struct, and add the imageStore as the second parameter of the NewLaptopServer() function:

type LaptopServer struct {
    laptopStore LaptopStore
    imageStore  ImageStore
}

func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore) *LaptopServer {
    return &LaptopServer{laptopStore, imageStore}
}

Then in the main.go file of the server, we also need to pass 2 stores into the NewLaptopServer function: one is the laptop store, and the other is the image store. Let's say we will save the uploaded images to "img" folder.

func main() {
    ...

    laptopStore := service.NewInMemoryLaptopStore()
    imageStore := service.NewDiskImageStore("img")
    laptopServer := service.NewLaptopServer(laptopStore, imageStore)

    ...
}

Now let’s implement the UploadImage() function.

First we call stream.Recv() to receive the first request, which contains the metadata information of the image. If there’s an error, we write a log and return the status code Unknown to the client.

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    req, err := stream.Recv()
    if err != nil {
        return logError(status.Errorf(codes.Unknown, "cannot receive image info"))
    }

    ...
}

func logError(err error) error {
    if err != nil {
        log.Print(err)
    }
    return err
}

To be concise and not repeat my self, here I define a logError() function to log the error before returning it. It only prints log if the error is not nil, and always returns the error to the caller.

Next we can get the laptop ID and the image type from the request. Let’s write a log here saying that we have received the upload-image request with this laptop ID and image type.

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    laptopID := req.GetInfo().GetLaptopId()
    imageType := req.GetInfo().GetImageType()
    log.Printf("receive an upload-image request for laptop %s with image type %s", laptopID, imageType)

    ...
}

Before saving the laptop image, we have to make sure that the laptop ID really exists. So we call server.laptopStore.Find() to find the laptop by ID.

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    laptop, err := server.laptopStore.Find(laptopID)
    if err != nil {
        return logError(status.Errorf(codes.Internal, "cannot find laptop: %v", err))
    }
    if laptop == nil {
        return logError(status.Errorf(codes.InvalidArgument, "laptop id %s doesn't exist", laptopID))
    }

    ...
}

If we get an error, just log and return it with the Internal status code. Else, if the laptop is nil, which means it is not found, we log and return an error status code InvalidArgument, or you might use code NotFound to be more precise.

Now if everything goes well and the laptop is found, we can start receiving the image chunks data. So let’s create a new byte buffer to store them, and also a variable to keep track of the total image size.

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    imageData := bytes.Buffer{}
    imageSize := 0

    for {
        log.Print("waiting to receive more data")

        req, err := stream.Recv()
        if err == io.EOF {
            log.Print("no more data")
            break
        }
        if err != nil {
            return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
        }

        ...
    }

    ...
}

Since we’re going to receive many requests from the stream, I use a for loop here. And inside the loop, we call stream.Recv() to get the request.

But this time, we first check if the error is EOF or not. If it is, this means that no more data will be sent, and we can safely break the loop. Else, if the error is still not nil, we return it with Unknown status code to the client.

Otherwise, if there’s no error, we can get the chunk data from the request. We get its size using the len() function, and add this size to the total image size.

Let’s say we don’t want the client to send too large image, so we check if the image size is greater than the maximum size, let's say 1 MB as defined by the constant maxImageSize (1 MB = 2^20 bytes = 1 << 20 bytes).

const maxImageSize = 1 << 20

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    imageData := bytes.Buffer{}
    imageSize := 0

    for {
        ...

        chunk := req.GetChunkData()
        size := len(chunk)

        log.Printf("received a chunk with size: %d", size)

        imageSize += size
        if imageSize > maxImageSize {
            return logError(status.Errorf(codes.InvalidArgument, "image is too large: %d > %d", imageSize, maxImageSize))
        }
        _, err = imageData.Write(chunk)
        if err != nil {
            return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
        }
    }

    ...
}

Now if the image size is greater than max image size, we can return an error with InvalidArgument status code, and a message saying the image is too large. Else, we can append the chunk to the image data with the Write() function. Also log and return Internal status code if an error occurs.

After the for loop, we have collected all data of the image in the buffer. So we can call imageStore.Save() to save the image data to the store and get back the image ID:

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    for {
        ...
    }

    imageID, err := server.imageStore.Save(laptopID, imageType, imageData)
    if err != nil {
        return logError(status.Errorf(codes.Internal, "cannot save image to the store: %v", err))
    }

    res := &pb.UploadImageResponse{
        Id:   imageID,
        Size: uint32(imageSize),
    }

    err = stream.SendAndClose(res)
    if err != nil {
        return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
    }

    log.Printf("saved image with id: %s, size: %d", imageID, imageSize)
    return nil
}

If the image is saved successfully, we create a response object with the image ID and image size. Then we call stream.SendAndClose() to send the response to client.

And finally we can write a log saying that the image is successfully saved with this ID and size.

Then we’re done with the server. Now let’s implement the client.

3. Implement the client

First I will refactor the code that we've written in the previous lectures a bit. Let’s add laptop as a parameter of this createLaptop() function.

func createLaptop(laptopClient pb.LaptopServiceClient, laptop *pb.Laptop) {
    ...
}

Then create separate functions to test the create laptop RPC & search laptop RPC:

func testCreateLaptop(laptopClient pb.LaptopServiceClient) {
    createLaptop(laptopClient, sample.NewLaptop())
}

func testSearchLaptop(laptopClient pb.LaptopServiceClient) {
    ...
}

Now we will write a new function to test the upload image RPC and call it from the main function:

func testUploadImage(laptopClient pb.LaptopServiceClient) {
    laptop := sample.NewLaptop()
    createLaptop(laptopClient, laptop)
    uploadImage(laptopClient, laptop.GetId(), "tmp/laptop.jpg")
}

func main() {
    serverAddress := flag.String("address", "", "the server address")
    flag.Parse()
    log.Printf("dial server %s", *serverAddress)

    conn, err := grpc.Dial(*serverAddress, grpc.WithInsecure())
    if err != nil {
        log.Fatal("cannot dial server: ", err)
    }

    laptopClient := pb.NewLaptopServiceClient(conn)
    testUploadImage(laptopClient)
}

In the testUploadImage() function, we first generate a random laptop, and call createLaptop() function to create it on the server.

Then we will write the uploadImage() function to upload an image of this laptop to the server. This function has 3 input parameters: the laptop client, the laptop ID, and the path to the laptop image.

func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
    file, err := os.Open(imagePath)
    if err != nil {
        log.Fatal("cannot open image file: ", err)
    }
    defer file.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    stream, err := laptopClient.UploadImage(ctx)
    if err != nil {
        log.Fatal("cannot upload image: ", err)
    }

    ...
}

First we call os.Open() to open the image file. If there’s an error, we write a fatal log. Else, we use defer() to close the file afterward.

Then we create a context with timeout of 5 seconds, and call laptopClient.UploadImage() with that context. It will return a stream object, and an error. If error is not nil, we write a fatal log.

Otherwise, we create the first request to send some image information to the server, which includes the laptop ID, and the image type, or the extension of the image file:

func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
    ...

    req := &pb.UploadImageRequest{
        Data: &pb.UploadImageRequest_Info{
            Info: &pb.ImageInfo{
                LaptopId:  laptopID,
                ImageType: filepath.Ext(imagePath),
            },
        },
    }

    err = stream.Send(req)
    if err != nil {
        log.Fatal("cannot send image info to server: ", err, stream.RecvMsg(nil))
    }

    ...
}

After that, we call stream.Send() to send the first request to the server. If we get an error, write a fatal log.

Else, we will create a buffer reader to read the content of the image file in chunks. Let’s say each chunk will be 1 KB, or 1024 bytes. We will read the image data chunks sequentially in a for loop:

func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
    ...

    reader := bufio.NewReader(file)
    buffer := make([]byte, 1024)

    for {
        n, err := reader.Read(buffer)
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal("cannot read chunk to buffer: ", err)
        }

        req := &pb.UploadImageRequest{
            Data: &pb.UploadImageRequest_ChunkData{
                ChunkData: buffer[:n],
            },
        }

        err = stream.Send(req)
        if err != nil {
            log.Fatal("cannot send chunk to server: ", err)
        }
    }

    ...
}

First we call reader.Read() to read the data to the buffer. It will return the number of bytes read and an error. If the error is EOF, then it’s the end of the file, we simply break the loop. Else, if error is not nil, we write a fatal log.

Otherwise, we create a new request with the chunk data. Make sure that the chunk only contains the first n bytes of the buffer, since the last chunk might contain less than 1024 bytes.

Then we call stream.Send() to send it to the server. And again, write a fatal log if an error occurs.

Finally, after the for loop, We call stream.CloseAndRecv() to receive a response from the server:

func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
    ...

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatal("cannot receive response: ", err)
    }

    log.Printf("image uploaded with id: %s, size: %d", res.GetId(), res.GetSize())

    ...
}

If there's no error, we write a log saying that the image is successfully uploaded, and the server returns this ID and size.

And that’s it. The client is done. Let's put the laptop.jpg file to the tmp folder and run the server and client:

EOF error

We got an error: cannot send chunk to server: EOF. This error message is not very useful since it doesn’t tell us exactly why.

But we know that the message comes from this log:

    ...

    err = stream.Send(req)
    if err != nil {
        log.Fatal("cannot send chunk to server: ", err)
    }

    ...

The reason we got EOF is because when an error occurs, the server will close the stream, and thus the client cannot send more data to it.

To get the real error that contains the gRPC status code, we must call stream.RecvMsg() with a nil parameter. The nil parameter basically means that we don't expect to receive any message, but we just want to get the error that function returns.

    ...

    err = stream.Send(req)
    if err != nil {
        log.Fatal("cannot send chunk to server: ", err, stream.RecvMsg(nil))
    }

    ...

Now if we rerun the client, we can see that the real error is InvalidArgument, laptop doesn’t exist.

Error laptop doesn't exist

And it's because the laptop ID is empty as we set in the createLaptop() function in previous lecture:

Empty laptop ID

So let’s remove this line and rerun the client.

This time it works. The image is uploaded successfully. If we open the img folder, we can see the laptop image is saved there:

Saved laptop image

Excellent!

OK, now let’s see what happens if timeout occurs. Suppose that somehow the server is writing the data very slowly. Here I sleep 1 second before writing the chunk to the buffer.

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    for {
        ...

        // write slowly
        time.Sleep(time.Second)

        _, err = imageData.Write(chunk)
        if err != nil {
            return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
        }
    }

    ...
}

Let’s try it.

Deadline exceeded

After 5 seconds, we see an error log on the server. However the status code is Unknown, and it also contains other DeadlineExceeded error, which is not very nice.

So let’s fix this by checking the context error on server side before calling receive on the stream:

func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
    ...

    for {
        err := contextError(stream.Context())
        if err != nil {
            return err
        }

        log.Print("waiting to receive more data")

        req, err := stream.Recv()
        if err == io.EOF {
            log.Print("no more data")
            break
        }
        ...
    }

    ...
}

I have extracted the context error checking block from the create laptop RPC, and make it a separate function:

func contextError(ctx context.Context) error {
    switch ctx.Err() {
    case context.Canceled:
        return logError(status.Error(codes.Canceled, "request is canceled"))
    case context.DeadlineExceeded:
        return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
    default:
        return nil
    }
}

Here we use switch case make it more concise and easier to read:

  • In case the context error is Canceled, we log it and return the error with Canceled status code.
  • In case DeadlineExceeded, we do the same thing, but with DeadlineExceeded status code.
  • And for default case, we just return nil.

Alright, let’s rerun the server and the client.

Better error

Now on the server side, we see a better error log with status code DeadLineExceeded. Perfect!

Let’s try another case where the upload image is larger than the maximum allowed size. I will change the max file size constraint to 1 KB instead of 1 MB.

const maxImageSize = 1 << 10

Then rerun the server and the client.

Image too large

This time we got InvalidArgument: image is too large. And on the server side, it only receives 2 data chunks before the same error log is printed. So it works!

Now let’s learn how to write test for this client-streaming RPC.

4. Write unit test

For this test, I'm gonna use tmp as the folder to store images. The first thing we need to do is to create a new in-memory laptop store, and create a new disk image store with the tmp image folder.

func TestClientUploadImage(t *testing.T) {
    t.Parallel()

    testImageFolder := "../tmp"

    laptopStore := service.NewInMemoryLaptopStore()
    imageStore := service.NewDiskImageStore(testImageFolder)

    laptop := sample.NewLaptop()
    err := laptopStore.Save(laptop)
    require.NoError(t, err)

    serverAddress := startTestLaptopServer(t, laptopStore, imageStore, nil)
    laptopClient := newTestLaptopClient(t, serverAddress)

    ...
}

We generate a sample laptop, and save it to the laptop store. Then we start the test server and make a new laptop client.

The image we’re gonna upload is the laptop.jpg file inside the tmp folder. So we open the file, check that there’s no error, and defer closing it. Then we call laptopClient.UploadImage() to get the stream and aequire that no error should occur.

func TestClientUploadImage(t *testing.T) {
    ...

    imagePath := fmt.Sprintf("%s/laptop.jpg", testImageFolder)
    file, err := os.Open(imagePath)
    require.NoError(t, err)
    defer file.Close()

    stream, err := laptopClient.UploadImage(context.Background())
    require.NoError(t, err)

    ...
}

Actually the rest of the test is very similar to what we’ve done in the client/main.go file. We send the first request that contains only the metadata of the laptop image.

func TestClientUploadImage(t *testing.T) {
    ...

    imageType := filepath.Ext(imagePath)
    req := &pb.UploadImageRequest{
        Data: &pb.UploadImageRequest_Info{
            Info: &pb.ImageInfo{
                LaptopId:  laptop.GetId(),
                ImageType: imageType,
            },
        },
    }

    err = stream.Send(req)
    require.NoError(t, err)

    ...
}

Then we use a for loop to send the content of the image files in chunks:

func TestClientUploadImage(t *testing.T) {
    ...

    reader := bufio.NewReader(file)
    buffer := make([]byte, 1024)
    size := 0

    for {
        n, err := reader.Read(buffer)
        if err == io.EOF {
            break
        }

        require.NoError(t, err)
        size += n

        req := &pb.UploadImageRequest{
            Data: &pb.UploadImageRequest_ChunkData{
                ChunkData: buffer[:n],
            },
        }

        err = stream.Send(req)
        require.NoError(t, err)
    }

    ...

We also want to keep track of the total image size, So I define a size variable for that. And everytime we read a new data block, we add n to the size.

Last step, we call stream.CloseAndRecv() to get the response from the server, and we that the returned ID should not be a zero-value, and that the value of the returned image size should equal to size.

func TestClientUploadImage(t *testing.T) {
    ...

    res, err := stream.CloseAndRecv()
    require.NoError(t, err)
    require.NotZero(t, res.GetId())
    require.EqualValues(t, size, res.GetSize())

    ...
}

We also want to check that the image is saved to the correct folder on the server. It should be inside the test image folder, with file name is the image ID and file extension is the image type. We can use require.FileExists() function to check that.

func TestClientUploadImage(t *testing.T) {
    ...

    savedImagePath := fmt.Sprintf("%s/%s%s", testImageFolder, res.GetId(), imageType)
    require.FileExists(t, savedImagePath)
    require.NoError(t, os.Remove(savedImagePath))
}

And finally we need to remove the file at the end of the test.

Alright, let’s run it.

Test passed

It passed!

And that’s it for today’s lecture about client-streaming RPC. Thank you for reading and I will see you in the next article!


If you like the article, please subscribe to our Youtube channel and follow us on Twitter for more tutorials in the future.

Posted on by:

techschoolguru profile

TECH SCHOOL

@techschoolguru

We believe that everyone deserves a good and free education. The purpose of Tech School is to give everyone a chance to learn IT by giving free, high-quality tutorials and coding courses.

Discussion

markdown guide