If you're trying to use Apache Airflow to copy large objects in S3, you might have encountered issues where S3 complains about you sending an InvalidRequest
. We will fix that in this post by writing a custom operator to handle the underlying problem. Before we do that, let's first understand where this issue originated.
In case you need a primer on Airflow, check out our post Understanding Apache Airflow on AWS, but if you do - what are you doing here?
The error message that led you to this post looks something like this:
An error occurred (InvalidRequest) when calling the CopyObject operation: The specified copy source is larger than the maximum allowable size for a copy source [...]
And the problematic code that caused it may look something like that. Although this code is not problematic in itself. For smaller objects, it works perfectly fine.
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
op = S3CopyObjectOperator(
task_id="copy_source_file",
source_bucket_name="source-bucket",
source_bucket_key="path/to_large_object",
dest_bucket_name="destination-bucket",
dest_bucket_key="save-me-here",
)
The name of the operator indicates the underlying API call, which is CopyObject
. In general, CopyObject
is pretty robust, but there's a failure mode that's not immediately obvious - it is limited to copying objects up to 5GB in a single operation. That means attempting to copy any object larger than that leads to the aforementioned error message.
This is mentioned in the first big info box in the docs, but who has time to read those, right? It even includes a link to the proposed solution, which means using multipart uploads to achieve this. Before you groan - no, you won't have to handle splitting up stuff into byte ranges and managing the individual copy operations. All of that comes included in your installation of boto3 in the form of the S3 client's copy
operation.
The copy method uses the underlying s3transfer library that ships with boto3 and transparently manages the multipart uploads. I should note here, that despite the name being "multipart upload", we don't need to download the object first before uploading it again, it uses the UploadPartCopy
API, which keeps the data internal to S3.
Using this is as simple as subclassing the existing S3CopyObjectOperator
and overwriting the execute
method, which Airflow calls to perform the actual operation. In the code below, you can see that I try to delegate the API call to the parent class and only become active if there's an exception.
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from botocore.exceptions import ClientError
class S3CopyOperator(S3CopyObjectOperator):
"""
An extension of the S3CopyObjectOperator that can copy
objects larger than 5GB.
"""
def execute(self, context: Context):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
try:
super().execute(context)
except ClientError as err:
if err.response["Error"]["Code"] == "InvalidRequest":
# The response when we try to copy more than 5GB in one request.
s3_hook.conn.copy(
CopySource={
"Bucket": self.source_bucket_name,
"Key": self.source_bucket_key,
},
Bucket=self.dest_bucket_name,
Key=self.dest_bucket_key,
)
else:
raise err
The error code InvalidRequest
is used to denote this specific error, although confusingly it's not listed in the API docs for CopyObject
but as part of the generic error responses documentation. If we get this error, we use the conn
object of an S3Hook
, which is basically a wrapper around boto3
to call the appropriate copy
method with the parameters from our instance. You can use it like this:
op = S3CopyOperator(
task_id="copy_source_file",
source_bucket_name="source-bucket",
source_bucket_key="path/to_large_object",
dest_bucket_name="destination-bucket",
dest_bucket_key="save-me-here",
)
This approach means the API doesn't change, i.e., you can just replace the S3CopyObjectOperator
instances with S3CopyOperator
instances. Additionally, we only perform the extra work of doing the multipart upload when the simpler method is insufficient. The trade-off is that we're inefficient if almost every object is larger than 5GB because we're doing a "useless" API call first. As usual, it depends. A similar approach has been discussed in this Github Issue in the Airflow repository.
β Maurice
Photo by K. Mitch Hodge on Unsplash
Top comments (0)