DEV Community

loading...

Python Data Streaming to Google Cloud Storage with Resumable Uploads

sethmlarson profile image Seth Michael Larson Updated on ・5 min read

A few days ago I spent a large chunk of my afternoon working on implementing memory-efficient data streaming to Google Cloud Storage (GCS) from a Python runtime.

There were several roadblocks along the way and I'd like to create the documentation that I wish I could find while working on the issue.

This article uses Python 3.6.4 but can be adapted for other Python versions.

GCS support in google-cloud Module

The google-cloud package is a giant collection of modules that can be used to interface with all of the Google Cloud Platform services so it's a great place to start.

python -m pip install -U google-cloud

Within the google-cloud package is a module called google.cloud.storage which deals with all things GCS.

I downloaded and setup my GOOGLE_APPLICATION_CREDENTIALS locally and opened up a Python console to test out some of the functionality. I was able to quickly connect to GCS, create a Bucket, create a Blob, and upload binary data to the Blob.

from google.cloud import storage

client = storage.Client()
bucket = client.create_bucket('test-bucket')
blob = client.blob('test-blob')

blob.upload_from_string(
    data=b'x' * 1024,
    content_type='application/octet-stream',
    client=client
)

One thing I immediately noticed was that for building Armonaut my use-case would be progressively streaming output to GCS without saving the output to the file-system of the compute instance. There had to be a way to stream data rather than upload it all in one go.

Resumable Uploads to the Rescue!

The initial research I did uncovered Resumable Uploads as an option for Google Cloud Storage. From their description it says that they have the following use-cases:

  • You are uploading a large file.
  • The chances of network failure are high.
  • You don't know the size of the file when the upload starts.

Reasons #1 and #3 both applied to my use-case so I started investigating further.

I searched the google-cloud documentation for a mention of resumable uploads which yielded the Blob.create_resumable_upload_session() method. This method starts a Resumable Upload and returns a URL.

Resumable Media Package

The set of interactions that must occur for a Resumable Upload to complete successfully were quite complex and I suspected there was already a package that handles this exchange. I found the google-resumable-media package with a bit of Googling. ;-)

python -m pip install -U google-resumable-media

The key part of this package I was interested in is the google.resumable_media.requests.ResumableUpload class which takes an authorized transport and then allows you to upload data in chunks and recover when errors are detected.

So far this was the code I was working with:

import io
from google.auth.transport.requests import AuthorizedSession
from google.cloud import storage
from google.resumable_media.requests import ResumableUpload

chunk_size = 256 * 1024  # Minimum chunk-size supported by GCS
stream = io.BytesIO(b'x' * (1024 * 1024))  # Fake data stream

client = storage.Client()
bucket = client.bucket('test-bucket')
blob = client.blob('test-blob')

# Create a Resumable Upload
url = blob.create_resumable_upload_session(
    content_type='application/octet-stream',
    client=client
)

# Pass the URL off to the ResumableUpload object
upload = ResumableUpload(
    upload_url=url,
    chunk_size=chunk_size
)
transport = AuthorizedSession(credentials=client._credentials)

# Start using the Resumable Upload
upload.initiate(
    transport=transport,
    content_type='application/octet-stream',
    stream=stream,
    metadata={'name': blob.name}
)

Problem was I was getting an error on upload.initiate(). It was complaining that there was no Location header on the response. I investigated this issue and found that create_resumable_upload_session() was doing the work of upload.initiate()! I removed that step and instead used the API endpoint provided in the Resumable Upload documentation.

# Create a Resumable Upload
url = (
    f'https://www.googleapis.com/upload/storage/v1/b/'
    f'{bucket.name}/o?uploadType=resumable'
)
upload = ResumableUpload(
    upload_url=url,
    chunk_size=chunk_size
)
transport = AuthorizedSession(credentials=client._credentials)

# Start using the Resumable Upload
upload.initiate(
    transport=transport,
    content_type='application/octet-stream',
    stream=stream,
    metadata={'name': blob.name}
)

This snippet worked to start a Resumable Upload! Now to stream the data.

Streaming Data and stream_last=False

The ResumableUpload object has a method called transmit_next_chunk which tells the upload that the next chunk may be uploaded. While reading the documentation about this method I found stream_final which was a parameter of the ResumableUpload.initiate method.

I found that if stream_final is set to False then the ResumableUpload will detect the "end" of the stream when a chunk is transmitted that is less than the chunk_size parameter set in its constructor. This meant that to stream an unknown amount of data that each chunk would have to be >256KiB and would have to buffer output until that size was reached to be trasmitted.

Enjoying this post? Check out my Dev Blog for more.

Putting it All Together

After getting a simple example working I created a class that handles a single stream of unknown length data being uploaded to a blob progressively and recovers from network errors if detected.

To accomplish this I implemented an object that both buffered data and had a file-like interface in order for it to be used by ResumableUpload as a stream and be passed into other functions that require file-like objects for writing data.

Here is my final implementation:

from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from google.cloud import storage

class GCSObjectStreamUpload(object):
    def __init__(
            self, 
            client: storage.Client,
            bucket_name: str,
            blob_name: str,
            chunk_size: int=256 * 1024
        ):
        self._client = client
        self._bucket = self._client.bucket(bucket_name)
        self._blob = self._bucket.blob(blob_name)

        self._buffer = b''
        self._buffer_size = 0
        self._chunk_size = chunk_size
        self._read = 0

        self._transport = AuthorizedSession(
            credentials=self._client._credentials
        )
        self._request = None  # type: requests.ResumableUpload

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, *_):
        if exc_type is None:
            self.stop()

    def start(self):
        url = (
            f'https://www.googleapis.com/upload/storage/v1/b/'
            f'{self._bucket.name}/o?uploadType=resumable'
        )
        self._request = requests.ResumableUpload(
            upload_url=url, chunk_size=self._chunk_size
        )
        self._request.initiate(
            transport=self._transport,
            content_type='application/octet-stream',
            stream=self,
            stream_final=False,
            metadata={'name': self._blob.name},
        )

    def stop(self):
        self._request.transmit_next_chunk(self._transport)

    def write(self, data: bytes) -> int:
        data_len = len(data)
        self._buffer_size += data_len
        self._buffer += data
        del data
        while self._buffer_size >= self._chunk_size:
            try:
                self._request.transmit_next_chunk(self._transport)
            except common.InvalidResponse:
                self._request.recover(self._transport)
        return data_len

    def read(self, chunk_size: int) -> bytes:
        # I'm not good with efficient no-copy buffering so if this is
        # wrong or there's a better way to do this let me know! :-)
        to_read = min(chunk_size, self._buffer_size)
        memview = memoryview(self._buffer)
        self._buffer = memview[to_read:].tobytes()
        self._read += to_read
        self._buffer_size -= to_read
        return memview[:to_read].tobytes()

    def tell(self) -> int:
        return self._read

The class can be used like so:

client = storage.Client()

with GCSObjectStreamUpload(client=client, bucket='test-bucket', blob='test-blob') as s:
    for _ in range(1024):
        s.write(b'x' * 1024)

Thanks for reading!

Discussion (2)

pic
Editor guide
Collapse
yiga2 profile image
Yannick Einsweiler

Very nice Seth !
Something I had wished the "official" Client library would have covered but it had no traction. We turned to gcsfs library for the same but yours is based on the former lib so better supported.

Also worth noting is that google-resumable-media includes a built-in exponential backoff retry so it will retry to upload the chunk if connection is lost.

Collapse
harsha_180195 profile image
Harsha

I have a query output and i need to put that output in google cloud storage using python. Can you suggest me how to do it or can you show me some sample code to push the data into google cloud storage blob?