A few days ago I spent a large chunk of my afternoon working on implementing memory-efficient data streaming to Google Cloud Storage (GCS) from a Python runtime.
There were several roadblocks along the way and I'd like to create the documentation that I wish I could find while working on the issue.
This article uses Python 3.6.4 but can be adapted for other Python versions.
GCS support in google-cloud
Module
The google-cloud
package is a giant collection of modules that can be used to interface with all of the Google Cloud Platform services so it's a great place to start.
python -m pip install -U google-cloud
Within the google-cloud
package is a module called google.cloud.storage
which deals with all things GCS.
I downloaded and setup my GOOGLE_APPLICATION_CREDENTIALS
locally and opened up a Python console to test out some of the functionality. I was able to quickly connect to GCS, create a Bucket, create a Blob, and upload binary data to the Blob.
from google.cloud import storage
client = storage.Client()
bucket = client.create_bucket('test-bucket')
blob = client.blob('test-blob')
blob.upload_from_string(
data=b'x' * 1024,
content_type='application/octet-stream',
client=client
)
One thing I immediately noticed was that for building Armonaut my use-case would be progressively streaming output to GCS without saving the output to the file-system of the compute instance. There had to be a way to stream data rather than upload it all in one go.
Resumable Uploads to the Rescue!
The initial research I did uncovered Resumable Uploads as an option for Google Cloud Storage. From their description it says that they have the following use-cases:
- You are uploading a large file.
- The chances of network failure are high.
- You don't know the size of the file when the upload starts.
Reasons #1 and #3 both applied to my use-case so I started investigating further.
I searched the google-cloud
documentation for a mention of resumable uploads which yielded the Blob.create_resumable_upload_session()
method. This method starts a Resumable Upload and returns a URL.
Resumable Media Package
The set of interactions that must occur for a Resumable Upload to complete successfully were quite complex and I suspected there was already a package that handles this exchange. I found the google-resumable-media
package with a bit of Googling. ;-)
python -m pip install -U google-resumable-media
The key part of this package I was interested in is the google.resumable_media.requests.ResumableUpload
class which takes an authorized transport and then allows you to upload data in chunks and recover when errors are detected.
So far this was the code I was working with:
import io
from google.auth.transport.requests import AuthorizedSession
from google.cloud import storage
from google.resumable_media.requests import ResumableUpload
chunk_size = 256 * 1024 # Minimum chunk-size supported by GCS
stream = io.BytesIO(b'x' * (1024 * 1024)) # Fake data stream
client = storage.Client()
bucket = client.bucket('test-bucket')
blob = client.blob('test-blob')
# Create a Resumable Upload
url = blob.create_resumable_upload_session(
content_type='application/octet-stream',
client=client
)
# Pass the URL off to the ResumableUpload object
upload = ResumableUpload(
upload_url=url,
chunk_size=chunk_size
)
transport = AuthorizedSession(credentials=client._credentials)
# Start using the Resumable Upload
upload.initiate(
transport=transport,
content_type='application/octet-stream',
stream=stream,
metadata={'name': blob.name}
)
Problem was I was getting an error on upload.initiate()
. It was complaining that there was no Location
header on the response. I investigated this issue and found that create_resumable_upload_session()
was doing the work of upload.initiate()
! I removed that step and instead used the API endpoint provided in the Resumable Upload documentation.
# Create a Resumable Upload
url = (
f'https://www.googleapis.com/upload/storage/v1/b/'
f'{bucket.name}/o?uploadType=resumable'
)
upload = ResumableUpload(
upload_url=url,
chunk_size=chunk_size
)
transport = AuthorizedSession(credentials=client._credentials)
# Start using the Resumable Upload
upload.initiate(
transport=transport,
content_type='application/octet-stream',
stream=stream,
metadata={'name': blob.name}
)
This snippet worked to start a Resumable Upload! Now to stream the data.
Streaming Data and stream_last=False
The ResumableUpload
object has a method called transmit_next_chunk
which tells the upload that the next chunk may be uploaded. While reading the documentation about this method I found stream_final
which was a parameter of the ResumableUpload.initiate
method.
I found that if stream_final
is set to False
then the ResumableUpload
will detect the "end" of the stream when a chunk is transmitted that is less than the chunk_size
parameter set in its constructor. This meant that to stream an unknown amount of data that each chunk would have to be >256KiB and would have to buffer output until that size was reached to be trasmitted.
Enjoying this post? Check out my Dev Blog for more.
Putting it All Together
After getting a simple example working I created a class that handles a single stream of unknown length data being uploaded to a blob progressively and recovers from network errors if detected.
To accomplish this I implemented an object that both buffered data and had a file-like interface in order for it to be used by ResumableUpload
as a stream
and be passed into other functions that require file-like objects for writing data.
Here is my final implementation:
from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from google.cloud import storage
class GCSObjectStreamUpload(object):
def __init__(
self,
client: storage.Client,
bucket_name: str,
blob_name: str,
chunk_size: int=256 * 1024
):
self._client = client
self._bucket = self._client.bucket(bucket_name)
self._blob = self._bucket.blob(blob_name)
self._buffer = b''
self._buffer_size = 0
self._chunk_size = chunk_size
self._read = 0
self._transport = AuthorizedSession(
credentials=self._client._credentials
)
self._request = None # type: requests.ResumableUpload
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, *_):
if exc_type is None:
self.stop()
def start(self):
url = (
f'https://www.googleapis.com/upload/storage/v1/b/'
f'{self._bucket.name}/o?uploadType=resumable'
)
self._request = requests.ResumableUpload(
upload_url=url, chunk_size=self._chunk_size
)
self._request.initiate(
transport=self._transport,
content_type='application/octet-stream',
stream=self,
stream_final=False,
metadata={'name': self._blob.name},
)
def stop(self):
self._request.transmit_next_chunk(self._transport)
def write(self, data: bytes) -> int:
data_len = len(data)
self._buffer_size += data_len
self._buffer += data
del data
while self._buffer_size >= self._chunk_size:
try:
self._request.transmit_next_chunk(self._transport)
except common.InvalidResponse:
self._request.recover(self._transport)
return data_len
def read(self, chunk_size: int) -> bytes:
# I'm not good with efficient no-copy buffering so if this is
# wrong or there's a better way to do this let me know! :-)
to_read = min(chunk_size, self._buffer_size)
memview = memoryview(self._buffer)
self._buffer = memview[to_read:].tobytes()
self._read += to_read
self._buffer_size -= to_read
return memview[:to_read].tobytes()
def tell(self) -> int:
return self._read
The class can be used like so:
client = storage.Client()
with GCSObjectStreamUpload(client=client, bucket='test-bucket', blob='test-blob') as s:
for _ in range(1024):
s.write(b'x' * 1024)
Thanks for reading!
Top comments (3)
Very nice Seth !
Something I had wished the "official" Client library would have covered but it had no traction. We turned to
gcsfs
library for the same but yours is based on the former lib so better supported.Also worth noting is that
google-resumable-media
includes a built-in exponential backoff retry so it will retry to upload the chunk if connection is lost.I have a query output and i need to put that output in google cloud storage using python. Can you suggest me how to do it or can you show me some sample code to push the data into google cloud storage blob?
Thank you so much Seth. Can you kindly do another one for a download? using google-resumable-media