In order to stream data into a BigQuery table programmatically, Google is promoting a new API: The Storage Write API
Hence, the usual API and its ominous tabledata.insertAll method is now called "Legacy streaming API" which does not look very appealing when starting a new project.
Indeed, as stated in the official Google Cloud documentation:
For new projects, we recommend using the BigQuery Storage Write API instead of the tabledata.insertAll method.
Moreover, the new API is advertised with lower price and new features such as the possibility of exactly-once delivery.
Exciting, isn't it ?
Well, it is, but the Python client wrapping the new API is very bare-metal and its usage does not feel pythonic at all. As a consequence, integrating this API is much more difficult than usual with other Google Cloud clients, which is quite surprising. Having recently completed the integration of this new product I can speak from experience: I faced an unusually high number of challenges to integrate the Storage Write API into a Python application, for the most common use case: writing data rows into a BigQuery table.
This article aims to list these issues and help future developers to overcome them.
Describe the target schema with Protobuf
Protocol Buffers aka Protobuf is the portable exchange format widely used amongst Google Cloud API. It is usually hidden in the implementation when client libraries are used. With the new streaming API however, you will have to dive into it.
Protobuf relies on a schema descriptor. It describes how exchanged data are structured. The descriptor is written as a .proto
text file where all fields, their type and their cardinality are listed.
The first thing to do when integrating the Storage Write API is to write the proto file. The message description must match the schema of the target BigQuery table:
- same field names (case insensitive)
- same structure (nested types are supported)
- compatible field types (see the type mapping table here)
Trick #1: use proto2 syntax
Protobuf now exists in two flavours: proto2 and proto3, the newest.
proto2 works well with BigQuery, whereas there are some issues with proto3 which is fairly recent. Moreover, all examples provided by GCP currently use proto2. So for now I recommend to stick to this version.
Trick #2: all your fields are optional
In proto2 syntax you can declare a field as optional or required. This possibility is removed from proto3 (optional is implicit). In proto2, it is now recommended by Google to declare all your fields as optional, even if they are REQUIRED in the Bigquery schema. However, you will still see some required
fields in GCP examples like here.
Trick #3: auto-generate the proto file
Writing a .proto
descriptor can be very tedious, if the target schema has many columns with deep nested structures: don't forget that the descriptor has to match the target schema exactly !
You can ease the pain by autogenerating some of the proto file from the bigquery schema. First, download the target schema from bq
:
bq show --schema --format=prettyjson dataset_name:project_name:target_table_name > schema_target_table.json
Then use some scripting to convert the downloaded schema file into a proto. ParseToProtoBuffer.py, courtesy of matthiasa4, is useful for inspiration.
Trick #4 : Bigquery TIMESTAMP are protobuf int64
Even though protobuf provides a timestamp data type, the best way to send a timestamp value to Bigquery is to declare an int64
field.
Set the field value to the Epoch timestamp in microseconds and it will be automatically converted into a Bigquery TIMESTAMP
Generate Python Protobuf objects
The next step is to generate Python code from the .proto
file.
Trick #5: install or upgrade protoc
Ensure you have installed the latest version of protoc, the Protobuf compiler.
The aim of this software is to generate code artefacts from proto files. Several flavours are available. Of course we pick Python. Invoke protoc like this:
protoc -I=. --python_out=. schema_target_table.proto
The outcome is a file named schema_target_table_pb2.py
The content of the generated file is surprising: it appears to be lacking a lot of definitions ! The reason is that the missing parts are going to be dynamically inserted at runtime by the Protobuf Python library. As a consequence:
- your IDE will be mad about you
- Pylint will insult you
- you have to take a guess about the missing definition names
Come on Google, are you serious ?
Trick #6: make _pb2.py files pass Pylint
Simply put the following line on top of each _pb2 file, and Pylint will leave you alone:
# pylint: skip-file
Trick #7: Import the missing classes
The generated classes have the following format:
- The same name as the message
- In case of a nested type, it will be accessible as a class variable of the parent type / class
Let's illustrate. The following proto file
syntax = "proto2";
message Mymessage {
optional string myfield = 1;
message Mysubmessage {
optional string mysubfield = 1;
}
optional Mysubmessage mycomplexfield = 2;
repeated string collection = 3;
}
Will generate python classes which can be imported like this:
from .schema_target_table_pb2.py import Mymessage
submessage_instance = Mymessage.Mysubmessage()
Needless to say, your IDE will turn red because of these imports. You will have to tame Pylint too:
# pylint: disable=no-name-in-module
Set Protobuf object fields
Filling proto fields up is very counterintuitive. Good job that Google provides an exhaustive documentation about it.
Here is a straight-to-the-point TL;DR:
Trick #8: Simple (scalar) type fields can be directly assigned
mymsg = Mymessage()
mymsg.myfield = "toto"
Trick #9: Use CopyFrom for nested type fields
Yes, CopyFrom()
, a Python method name in CamelCase starting with an upper letter. Come on, Google !
Anyway, you cannot assign a complex field directly:
mymsg = Mymessage()
mymsg.myfield = "toto"
mysubmsg = Mymessage.Mysubmessage()
mymsg.mycomplexfield.CopyFrom(mysubmsg)
Trick #10: Use append for repeated fields
You mustn't instanciate an empty list. Append it as if it existed
mymsg = Mymessage()
mymsg.collection.append("titi")
Store Protobuf object into Bigquery
The next step is to store the Protobuf objects into Bigquery. There again there are some tricks to achieve this:
Trick #11: Be a dataEditor
The user or service account performing the storage must have bigquery.tables.updateData
permission on the target table.
You get this permission in the bigquery.dataEditor
role
Trick #12: Don't set a package name in the proto file
In many proto file samples a package
directive is set:
package foo.bar;
message Mymessage{...
This is to avoid name clashes. But they are not really useful in Python (generated classes are identified by their file path) and moreover package names are not supported by Bigquery in nested message types when storing
So, just don't set a package.
Trick #13: abstract the storage in a manager.
The Protobuf object is ready to be inserted at last ! Adapt the snippet given by Google to your own code to perform the storage. As you can see, it's not really a one-liner: more than 20 lines are necessary just to setup the destination stream. Besides, each append operation requires an AppendRowsRequest to be created, which is tedious too.
It's a good idea to wrap all these tasks in a practical Manager class for your application to use. Here is an example implementation:
"""Wrapper around BigQuery call."""
from __future__ import annotations
from typing import Any, Iterable
import logging
from google.cloud import bigquery_storage
from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions
from google.cloud.bigquery_storage_v1 import types, writer
from google.protobuf import descriptor_pb2
from google.protobuf.descriptor import Descriptor
class DefaultStreamManager: # pragma: no cover
"""Manage access to the _default stream write streams."""
def __init__(
self,
table_path: str,
message_protobuf_descriptor: Descriptor,
bigquery_storage_write_client: bigquery_storage.BigQueryWriteClient,
):
"""Init."""
self.stream_name = f"{table_path}/_default"
self.message_protobuf_descriptor = message_protobuf_descriptor
self.write_client = bigquery_storage_write_client
self.append_rows_stream = None
def _init_stream(self):
"""Init the underlying stream manager."""
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The initial request must contain the stream name.
request_template.write_stream = self.stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of our message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto() # pylint: disable=no-member
self.message_protobuf_descriptor.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Create an AppendRowsStream using the request template created above.
self.append_rows_stream = writer.AppendRowsStream(
self.write_client, request_template
)
def send_appendrowsrequest(
self, request: types.AppendRowsRequest
) -> writer.AppendRowsFuture:
"""Send request to the stream manager. Init the stream manager if needed."""
try:
if self.append_rows_stream is None:
self._init_stream()
return self.append_rows_stream.send(request)
except bqstorage_exceptions.StreamClosedError:
# the stream needs to be reinitialized
self.append_rows_stream.close()
self.append_rows_stream = None
raise
# Use as a context manager
def __enter__(self) -> DefaultStreamManager:
"""Enter the context manager. Return the stream name."""
self._init_stream()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager : close the stream."""
if self.append_rows_stream is not None:
# Shutdown background threads and close the streaming connection.
self.append_rows_stream.close()
class BigqueryWriteManager:
"""Encapsulation for bigquery client."""
def __init__(
self,
project_id: str,
dataset_id: str,
table_id: str,
bigquery_storage_write_client: bigquery_storage.BigQueryWriteClient,
pb2_descriptor: Descriptor,
): # pragma: no cover
"""Create a BigQueryManager."""
self.bigquery_storage_write_client = bigquery_storage_write_client
self.table_path = self.bigquery_storage_write_client.table_path(
project_id, dataset_id, table_id
)
self.pb2_descriptor = pb2_descriptor
def write_rows(self, pb_rows: Iterable[Any]) -> None:
"""Write data rows."""
with DefaultStreamManager(
self.table_path, self.pb2_descriptor, self.bigquery_storage_write_client
) as target_stream_manager:
proto_rows = types.ProtoRows()
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
for row in pb_rows:
proto_rows.serialized_rows.append(row.SerializeToString())
# Create an append row request containing the rows
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
future = target_stream_manager.send_appendrowsrequest(request)
# Wait for the append row requests to finish.
future.result()
Conclusion
This API is promising but so more difficult to integrate in Python app than usual ! Hopefully, Google will publish a more high-level client library in the future.
If it's not the case, I hope that at least I spared you some headaches with this API usage.
Thanks for reading! I’m Matthieu, data engineer at Stack Labs.
If you want to discover the Stack Labs Data Platform or join an enthousiast Data Engineering team, please contact us.
Photo by Emily Bernal on Unsplash
Top comments (6)
Hi @matthieucham, Can you please help me.
I use your example in my code, but I need use it in async type, because i have async application.
I don`t understand, how and what i must change in your code for use
async/await in function and use BigQueryWriteAsyncClient for async.
Can you please write example for BigQueryWriteAsyncClient, how do you do that for BigQueryWriteClient ?
Please ))
Hi Eugene, sorry about the delay, have you solved your issue ?
Hi @matthieucham
I was using your script for my data pipeline(Kafka to bq), because of high memory utilization the application is OOM killed.
Can you please update the code with the committed type and implement in async/await functions
For trick 14, please do an example of how to use your wrapper library.
Hi Daniel, it's simple as calling
bq_manager. write_rows(protobuf_rows)
where
protobuf_rows
is a list of protobuf objects based on the descriptor which was passed to bq_manager's init.