AWS S3 is an industry-leading object storage service. We tend to store lots of data files on S3 and at times require processing these files. If the size of the file that we are processing is small, we can basically go with traditional file processing flow, wherein we fetch the file from S3 and then process it row by row level. But the question arises, what if the file is size is more viz. > 1GB
? 😓
Importing (reading) a large file leads Out of Memory
error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally? 🤔
📜 Let's consider some of the use-cases:
- We want to process a large CSV S3 file (~2GB) every day. It must be processed within a certain time frame (e.g. in 4 hours)
- We are required to process large S3 files regularly from the FTP server. New files come in certain time intervals and to be processed sequentially i.e. the old file has to be processed before starting to process the newer files.
These are some very good scenarios where local processing may impact the overall flow of the system. Also, if we are running these file processing units in containers, then we have got limited disk space to work with. Hence, a cloud streaming flow is needed (which can also parallelize the processing of multiple chunks
of the same file by streaming different chunks of the same file in parallel threads/processes). This is where I came across the AWS S3 Select
feature. 😎
📝 This post focuses on streaming a large file into smaller manageable chunks (sequentially). This approach can then be used to parallelize the processing by running in concurrent threads/processes. Check my next post on this.
S3 Select
With Amazon S3 Select
, you can use simple structured query language (SQL) statements to filter the contents of Amazon S3 objects and retrieve just the subset of data that you need. Using Amazon S3 Select
to filter this data, you can reduce the amount of data that Amazon S3 transfers, reducing the cost and latency to retrieve this data.
Amazon S3 Select works on objects stored in CSV, JSON, or Apache Parquet format. It also works with objects that are compressed with GZIP or BZIP2 (for CSV and JSON objects only) and server-side encrypted objects. You can specify the format of the results as either CSV or JSON, and you can determine how the records in the result are delimited.
📝 We will be using Python's boto3
to accomplish our end goal.
🧱 Constructing SQL expressions
In order to work with S3 Select
, boto3
provides select_object_content() function to query S3. You pass SQL expressions to Amazon S3 in the request. Amazon S3 Select
supports a subset of SQL. Check this link for more information on this.
response = s3_client.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression='SELECT * FROM S3Object',
InputSerialization={
'CSV': {
'FileHeaderInfo': 'USE',
'FieldDelimiter': ',',
'RecordDelimiter': '\n'
}
},
OutputSerialization={
'JSON': {
'RecordDelimiter': ','
}
}
)
In above request, InputSerialization
determines the S3 file type and related properties, while OutputSerialization
determines the response
that we get out of this select_object_content()
.
🌫️ Streaming Chunks
Now, as we have got some idea about how the S3 Select
works, let's try to accomplish our use-case of streaming chunks (subset) of a large file like how a paginated API works
. 😋
S3 Select
supports ScanRange
parameter which helps us to stream a subset of an object by specifying a range of bytes to query. S3 Select
requests for a series of non-overlapping scan ranges. Scan ranges don't need to be aligned with record boundaries. A record that starts within the scan range specified but extends beyond the scan range will be processed by the query. It means that the row would be fetched within the scan range and it might extend to fetch the whole row. It doesn't fetch a subset of a row
, either the whole row is fetched or it is skipped (to be fetched in another scan range).
Let's try to achieve this in 2 simple steps:
1. Find the total bytes of the S3 file
The following code snippet showcases the function that will perform a HEAD
request on our S3 file and determines the file size in bytes.
def get_s3_file_size(bucket: str, key: str) -> int:
"""Gets the file size of S3 object by a HEAD request
Args:
bucket (str): S3 bucket
key (str): S3 object path
Returns:
int: File size in bytes. Defaults to 0 if any error.
"""
aws_profile = current_app.config.get('AWS_PROFILE_NAME')
s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
file_size = 0
try:
response = s3_client.head_object(Bucket=bucket, Key=key)
if response:
file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
except ClientError:
logger.exception(f'Client error reading S3 file {bucket} : {key}')
return file_size
2. Create a generator to stream the chunks
Now, the logic is to yield the chunks of byte stream of the S3 file until we reach the file size. Rest assured, this continuous scan range won't result in over-lapping of rows in the response 😉 (check the output image / GitHub repo). Simple enough, eh? 😝
import ast
import boto3
from botocore.exceptions import ClientError
def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
"""Streams a S3 file via a generator.
Args:
bucket (str): S3 bucket
key (str): S3 object path
chunk_bytes (int): Chunk size in bytes. Defaults to 5000
Returns:
tuple[dict]: Returns a tuple of dictionary containing rows of file content
"""
aws_profile = current_app.config.get('AWS_PROFILE_NAME')
s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
expression = 'SELECT * FROM S3Object'
start_range = 0
end_range = min(chunk_bytes, file_size)
while start_range < file_size:
response = s3_client.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression=expression,
InputSerialization={
'CSV': {
'FileHeaderInfo': 'USE',
'FieldDelimiter': ',',
'RecordDelimiter': '\n'
}
},
OutputSerialization={
'JSON': {
'RecordDelimiter': ','
}
},
ScanRange={
'Start': start_range,
'End': end_range
},
)
"""
select_object_content() response is an event stream that can be looped to concatenate the overall result set
Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
"""
result_stream = []
for event in response['Payload']:
if records := event.get('Records'):
result_stream.append(records['Payload'].decode('utf-8'))
yield ast.literal_eval(''.join(result_stream))
start_range = end_range
end_range = end_range + min(chunk_bytes, file_size - end_range)
def s3_file_processing():
bucket = '<s3-bucket>'
key = '<s3-key>'
file_size = get_s3_file_size(bucket=bucket, key=key)
logger.debug(f'Initiating streaming file of {file_size} bytes')
chunk_size = 524288 # 512KB or 0.5MB
for file_chunk in stream_s3_file(bucket=bucket, key=key,
file_size=file_size, chunk_bytes=chunk_size):
logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
id_set = set()
for row in file_chunk:
# perform any other processing here
id_set.add(int(row.get('id')))
logger.info(f'{min(id_set)} --> {max(id_set)}')
Congratulations! 👏 We have successfully managed to solve one of the key challenges of processing a large S3 file without crashing our system. 🤘
📌 You can check out my GitHub repository for a complete working example of this approach 👇
idris-rampurawala / s3-select-demo
This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.
AWS S3 Select Demo
This project showcases the rich AWS S3 Select
feature to stream a large data file in a paginated style
.
Currently, S3 Select
does not support OFFSET
and hence we cannot paginate the results of the query. Hence, we use scanrange
feature to stream the contents of the S3 file.
Background
Importing (reading) a large file leads Out of Memory
error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally at once? 🤔
Well, we can make use of AWS S3 Select
to stream a large file via it's ScanRange
parameter. This approach…
✔️ Benefits of using S3-Select
- Reduced IO — thus better performance
- Reduced costs due to smaller data transfer fees
- Multiple chunks can be run in parallel to expedite the file processing using
ScanRange
in multiple threads/processes
❗ Limitations of S3 Select
- The maximum length of a record in the input or result is 1 MB
- Amazon S3 Select can only emit nested data using the JSON output format
- S3 select returns a stream of encoded bytes, so we have to loop over the returned stream and decode the output
records['Payload'].decode('utf-8')
- Only works on objects stored in CSV, JSON, or Apache Parquet format. For more flexibility/features, you can go for AWS Athena
📑 Resources
- My GitHub repository demonstrating the above approach
- AWS S3 Select boto3 reference
- AWS S3 Select userguide
- AWS S3 Select Example
You might also wanna read a sequel of this post 👇
Top comments (4)
Hi Idris, Great post! I tried to do a similar code where I select all data from a s3 file and recreated this same file locally with the same exact format. But after building the file I noticed that the local file had fewer records than the real one. As I increased the chunk size of the scan range the difference between the s3 file and the local file diminished. Do you have any idea why this might happen? Obs: file format: CSV , No compression, 5000 and 20000 bytes chunk range used for the tests. Once again, thank you for the post.
Hi,
Glad that you liked the post and it helped you in your use-case.
With this process of streaming the data, you have to keep retrieving the file chunk from S3 until you reach the total file size. I would recommend to clone this repo and compare with your local code to identify if you missed something 😉
Optionally, I would recommend to also check out the sequel to this post for parallel processing 😁
Parallelize Processing a Large AWS S3 File
Idris Rampurawala ・ Jun 25 ・ 6 min read
Thank you! I found out what I was missing, I made the start_byte = end_byte + 1. Losing one row per chunk. Your next article was exact what I was looking for for the next step of my program.
Is this the maximum file size of the file on S3?
Is there any size limit on the file that I want to "filter"?