DEV Community

Cover image for 13 tricks for the new Bigquery Storage Write API in Python
matthieucham for Stack Labs

Posted on • Updated on

13 tricks for the new Bigquery Storage Write API in Python

In order to stream data into a BigQuery table programmatically, Google is promoting a new API: The Storage Write API

Hence, the usual API and its ominous tabledata.insertAll method is now called "Legacy streaming API" which does not look very appealing when starting a new project.

Indeed, as stated in the official Google Cloud documentation:

For new projects, we recommend using the BigQuery Storage Write API instead of the tabledata.insertAll method.

Moreover, the new API is advertised with lower price and new features such as the possibility of exactly-once delivery.

Exciting, isn't it ?

Well, it is, but the Python client wrapping the new API is very bare-metal and its usage does not feel pythonic at all. As a consequence, integrating this API is much more difficult than usual with other Google Cloud clients, which is quite surprising. Having recently completed the integration of this new product I can speak from experience: I faced an unusually high number of challenges to integrate the Storage Write API into a Python application, for the most common use case: writing data rows into a BigQuery table.

This article aims to list these issues and help future developers to overcome them.

Describe the target schema with Protobuf

Protocol Buffers aka Protobuf is the portable exchange format widely used amongst Google Cloud API. It is usually hidden in the implementation when client libraries are used. With the new streaming API however, you will have to dive into it.

Protobuf relies on a schema descriptor. It describes how exchanged data are structured. The descriptor is written as a .proto text file where all fields, their type and their cardinality are listed.

The first thing to do when integrating the Storage Write API is to write the proto file. The message description must match the schema of the target BigQuery table:

  • same field names (case insensitive)
  • same structure (nested types are supported)
  • compatible field types (see the type mapping table here)

Trick #1: use proto2 syntax

Protobuf now exists in two flavours: proto2 and proto3, the newest.
proto2 works well with BigQuery, whereas there are some issues with proto3 which is fairly recent. Moreover, all examples provided by GCP currently use proto2. So for now I recommend to stick to this version.

Trick #2: all your fields are optional

In proto2 syntax you can declare a field as optional or required. This possibility is removed from proto3 (optional is implicit). In proto2, it is now recommended by Google to declare all your fields as optional, even if they are REQUIRED in the Bigquery schema. However, you will still see some required fields in GCP examples like here.

Trick #3: auto-generate the proto file

Writing a .proto descriptor can be very tedious, if the target schema has many columns with deep nested structures: don't forget that the descriptor has to match the target schema exactly !

You can ease the pain by autogenerating some of the proto file from the bigquery schema. First, download the target schema from bq:

bq show --schema --format=prettyjson dataset_name:project_name:target_table_name > schema_target_table.json
Enter fullscreen mode Exit fullscreen mode

Then use some scripting to convert the downloaded schema file into a proto. ParseToProtoBuffer.py, courtesy of matthiasa4, is useful for inspiration.

Trick #4 : Bigquery TIMESTAMP are protobuf int64

Even though protobuf provides a timestamp data type, the best way to send a timestamp value to Bigquery is to declare an int64 field.
Set the field value to the Epoch timestamp in microseconds and it will be automatically converted into a Bigquery TIMESTAMP

Generate Python Protobuf objects

The next step is to generate Python code from the .proto file.

Trick #5: install or upgrade protoc

Ensure you have installed the latest version of protoc, the Protobuf compiler.

The aim of this software is to generate code artefacts from proto files. Several flavours are available. Of course we pick Python. Invoke protoc like this:

protoc -I=. --python_out=.  schema_target_table.proto
Enter fullscreen mode Exit fullscreen mode

The outcome is a file named schema_target_table_pb2.py

The content of the generated file is surprising: it appears to be lacking a lot of definitions ! The reason is that the missing parts are going to be dynamically inserted at runtime by the Protobuf Python library. As a consequence:

  • your IDE will be mad about you
  • Pylint will insult you
  • you have to take a guess about the missing definition names

Come on Google, are you serious ?

Trick #6: make _pb2.py files pass Pylint

Simply put the following line on top of each _pb2 file, and Pylint will leave you alone:

# pylint: skip-file
Enter fullscreen mode Exit fullscreen mode

Trick #7: Import the missing classes

The generated classes have the following format:

  • The same name as the message
  • In case of a nested type, it will be accessible as a class variable of the parent type / class

Let's illustrate. The following proto file

syntax = "proto2";

message Mymessage {
  optional string myfield = 1;

  message Mysubmessage {
    optional string mysubfield = 1;
  }

  optional Mysubmessage mycomplexfield = 2;
  repeated string collection = 3;
}
Enter fullscreen mode Exit fullscreen mode

Will generate python classes which can be imported like this:

from .schema_target_table_pb2.py import Mymessage


submessage_instance = Mymessage.Mysubmessage()
Enter fullscreen mode Exit fullscreen mode

Needless to say, your IDE will turn red because of these imports. You will have to tame Pylint too:

# pylint: disable=no-name-in-module
Enter fullscreen mode Exit fullscreen mode

Set Protobuf object fields

Filling proto fields up is very counterintuitive. Good job that Google provides an exhaustive documentation about it.

Here is a straight-to-the-point TL;DR:

Trick #8: Simple (scalar) type fields can be directly assigned

mymsg = Mymessage()
mymsg.myfield = "toto"
Enter fullscreen mode Exit fullscreen mode

Trick #9: Use CopyFrom for nested type fields

Yes, CopyFrom(), a Python method name in CamelCase starting with an upper letter. Come on, Google !

Anyway, you cannot assign a complex field directly:

mymsg = Mymessage()
mymsg.myfield = "toto"
mysubmsg = Mymessage.Mysubmessage()
mymsg.mycomplexfield.CopyFrom(mysubmsg)
Enter fullscreen mode Exit fullscreen mode

Trick #10: Use append for repeated fields

You mustn't instanciate an empty list. Append it as if it existed

mymsg = Mymessage()
mymsg.collection.append("titi")
Enter fullscreen mode Exit fullscreen mode

Store Protobuf object into Bigquery

The next step is to store the Protobuf objects into Bigquery. There again there are some tricks to achieve this:

Trick #11: Be a dataEditor

The user or service account performing the storage must have bigquery.tables.updateData permission on the target table.

You get this permission in the bigquery.dataEditor role

Trick #12: Don't set a package name in the proto file

In many proto file samples a package directive is set:

package foo.bar;

message Mymessage{...
Enter fullscreen mode Exit fullscreen mode

This is to avoid name clashes. But they are not really useful in Python (generated classes are identified by their file path) and moreover package names are not supported by Bigquery in nested message types when storing

So, just don't set a package.

Trick #13: abstract the storage in a manager.

The Protobuf object is ready to be inserted at last ! Adapt the snippet given by Google to your own code to perform the storage. As you can see, it's not really a one-liner: more than 20 lines are necessary just to setup the destination stream. Besides, each append operation requires an AppendRowsRequest to be created, which is tedious too.

It's a good idea to wrap all these tasks in a practical Manager class for your application to use. Here is an example implementation:

"""Wrapper around BigQuery call."""
from __future__ import annotations
from typing import Any, Iterable
import logging
from google.cloud import bigquery_storage
from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions

from google.cloud.bigquery_storage_v1 import types, writer
from google.protobuf import descriptor_pb2
from google.protobuf.descriptor import Descriptor



class DefaultStreamManager:  # pragma: no cover
    """Manage access to the _default stream write streams."""

    def __init__(
        self,
        table_path: str,
        message_protobuf_descriptor: Descriptor,
        bigquery_storage_write_client: bigquery_storage.BigQueryWriteClient,
    ):
        """Init."""
        self.stream_name = f"{table_path}/_default"
        self.message_protobuf_descriptor = message_protobuf_descriptor
        self.write_client = bigquery_storage_write_client
        self.append_rows_stream = None

    def _init_stream(self):
        """Init the underlying stream manager."""
        # Create a template with fields needed for the first request.
        request_template = types.AppendRowsRequest()
        # The initial request must contain the stream name.
        request_template.write_stream = self.stream_name
        # So that BigQuery knows how to parse the serialized_rows, generate a
        # protocol buffer representation of our message descriptor.
        proto_schema = types.ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()  # pylint: disable=no-member
        self.message_protobuf_descriptor.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data
        # Create an AppendRowsStream using the request template created above.
        self.append_rows_stream = writer.AppendRowsStream(
            self.write_client, request_template
        )

    def send_appendrowsrequest(
        self, request: types.AppendRowsRequest
    ) -> writer.AppendRowsFuture:
        """Send request to the stream manager. Init the stream manager if needed."""
        try:
            if self.append_rows_stream is None:
                self._init_stream()
            return self.append_rows_stream.send(request)
        except bqstorage_exceptions.StreamClosedError:
            # the stream needs to be reinitialized
            self.append_rows_stream.close()
            self.append_rows_stream = None
            raise

    # Use as a context manager

    def __enter__(self) -> DefaultStreamManager:
        """Enter the context manager. Return the stream name."""
        self._init_stream()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Exit the context manager : close the stream."""
        if self.append_rows_stream is not None:
            # Shutdown background threads and close the streaming connection.
            self.append_rows_stream.close()


class BigqueryWriteManager:
    """Encapsulation for bigquery client."""

    def __init__(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
        bigquery_storage_write_client: bigquery_storage.BigQueryWriteClient,
        pb2_descriptor: Descriptor,
    ):  # pragma: no cover
        """Create a BigQueryManager."""
        self.bigquery_storage_write_client = bigquery_storage_write_client

        self.table_path = self.bigquery_storage_write_client.table_path(
            project_id, dataset_id, table_id
        )
        self.pb2_descriptor = pb2_descriptor

    def write_rows(self, pb_rows: Iterable[Any]) -> None:
        """Write data rows."""
        with DefaultStreamManager(
            self.table_path, self.pb2_descriptor, self.bigquery_storage_write_client
        ) as target_stream_manager:
            proto_rows = types.ProtoRows()
            # Create a batch of row data by appending proto2 serialized bytes to the
            # serialized_rows repeated field.
            for row in pb_rows:
                proto_rows.serialized_rows.append(row.SerializeToString())
            # Create an append row request containing the rows
            request = types.AppendRowsRequest()
            proto_data = types.AppendRowsRequest.ProtoData()
            proto_data.rows = proto_rows
            request.proto_rows = proto_data

            future = target_stream_manager.send_appendrowsrequest(request)

            # Wait for the append row requests to finish.
            future.result()

Enter fullscreen mode Exit fullscreen mode

Conclusion

This API is promising but so more difficult to integrate in Python app than usual ! Hopefully, Google will publish a more high-level client library in the future.

If it's not the case, I hope that at least I spared you some headaches with this API usage.

Thanks for reading! I’m Matthieu, data engineer at Stack Labs.
If you want to discover the Stack Labs Data Platform or join an enthousiast Data Engineering team, please contact us.

Photo by Emily Bernal on Unsplash

Top comments (5)

Collapse
 
eugenes1991 profile image
EugeneS1991

Hi @matthieucham, Can you please help me.
I use your example in my code, but I need use it in async type, because i have async application.
I don`t understand, how and what i must change in your code for use
async/await in function and use BigQueryWriteAsyncClient for async.
Can you please write example for BigQueryWriteAsyncClient, how do you do that for BigQueryWriteClient ?

Please ))

Collapse
 
matthieucham profile image
matthieucham

Hi Eugene, sorry about the delay, have you solved your issue ?

Collapse
 
alfredjoy profile image
Alfred Joy • Edited

Hi @matthieucham
I was using your script for my data pipeline(Kafka to bq), because of high memory utilization the application is OOM killed.
Can you please update the code with the committed type and implement in async/await functions

Collapse
 
dbrtly profile image
Daniel Bartley

For trick 14, please do an example of how to use your wrapper library.

Collapse
 
matthieucham profile image
matthieucham

Hi Daniel, it's simple as calling
bq_manager. write_rows(protobuf_rows)
where protobuf_rows is a list of protobuf objects based on the descriptor which was passed to bq_manager's init.