DEV Community

loading...
Cover image for CLI upload for large files
Traindex

CLI upload for large files

mohsinashraf profile image Mohsin Ashraf Originally published at traindex.io ・5 min read

We deal with data every day as part of my work in the data science team. It starts by collecting data and analyzing it for potentially important features and baseline numbers. Then we do data preprocessing and cleaning. Finally, we feed the data into a machine learning algorithm for training.

Once the training is complete, we test the model. We then serve via an API if the performance is good.

In a previous article, we talked about uploading large files using multipart upload via pre-signed URLs. We will take a step further now and discuss how to create a CLI tool for uploading large files to S3 using pre-signed URLs.

The article comprises 3 parts, as described below:

  1. Create pre-signed URLs for multipart upload
  2. Upload all parts of the object
  3. Complete the upload

Request for Multipart upload pre-signed URLs

First of all, we have to request the pre-signed URLs to the AWS S3 bucket. It will return a list of pre-signed URLs corresponding with each of the object’s parts, along with a upload_id, which is associated with the object whose parts are being created. Let’s create the route for requesting pre-signed URLs.

from pathlib import Path
…
…

@app.route('/presigned',methods=['POST'])
def return_presigned():
    data = request.form.to_dict(flat=False)
    file_name = data['file_name'][0]
    file_size = int(data['file_size'][0])
    target_file = Path(file_name)
    max_size = 5 * 1024 * 1024
    upload_by = int(file_size / max_size) + 1
    bucket_name = "YOUR_BUCKET_NAME"
    key = file_name
    upload_id = s3util.start(bucket_name, key)
    urls = []
    for part in range(1, upload_by + 1):
           signed_url = s3util.create_presigned_url(part)
             urls.append(signed_url)
    return jsonify({
                     'bucket_name':bucket_name,
                     'key':key,
                     'upload_id':upload_id,
                'file_size:file_size,
                   'file_name':file_name,
                'max_size':max_size,
                     'upload_by':upload_by,
                'urls':urls
            })
Enter fullscreen mode Exit fullscreen mode

Let’s go through the code. In this route (Flask route), we get the information sent in the request: file_name and file_size.
The file_name will be used in creating URLs for parts of the object, and file_size will be used to find how many parts to create (pre-signed URLs to create).
In the route, max_size determines each part’s maximum size. You can change it according to your needs.
upload_by tells how many parts there will be for the object to upload.
bucket_name is the bucket you want to upload data in.
upload_id is generated using the S3 utility function create_multipart_upload, which we will discuss shortly.
After that, pre-signed URLs are created in the for loop using the create_presigned_url utility function of s3. Again, we will come back to it in a bit.
Next, I return the required data in JSON format.

Now, let’s talk about create_multipart_upload. It’s a utility function that helps me encapsulate the code so it’s more readable and manageable. Following is the code for the utility class.

import boto3
from botocore.exceptions import ClientError
from boto3 import Session


class S3MultipartUploadUtil:
    """
    AWS S3 Multipart Upload Uril
    """
    def __init__(self, session: Session):
        self.session = session
        self.s3 = session.client('s3')
        self.upload_id = None
        self.bucket_name = None
        self.key = None

    def start(self, bucket_name: str, key: str):
        """
        Start Multipart Upload
        :param bucket_name:
        :param key:
        :return:
        """
        self.bucket_name = bucket_name
        self.key = key
        res = self.s3.create_multipart_upload(Bucket=bucket_name, Key=key)
        self.upload_id = res['UploadId']
        logger.debug(f"Start multipart upload '{self.upload_id}'")
        return self.upload_id

    def create_presigned_url(self, part_no: int, expire: int=3600) -> str:
        """
        Create pre-signed URL for upload part.
        :param part_no:
        :param expire:
        :return:
        """
        signed_url = self.s3.generate_presigned_url(
            ClientMethod='upload_part',
            Params={'Bucket': self.bucket_name,
                    'Key': self.key,
                    'UploadId': self.upload_id,
                    'PartNumber': part_no},
            ExpiresIn=expire)
        logger.debug(f"Create presigned url for upload part '{signed_url}'")
        return signed_url

    def complete(self, parts,id,key,bucket_name):
        """
        Complete Multipart Uploading.
        `parts` is list of dictionary below.
        ```


        [ {'ETag': etag, 'PartNumber': 1}, {'ETag': etag, 'PartNumber': 2}, ... ]


        ```
        you can get `ETag` from upload part response header.
        :param parts: Sent part info.
        :return:
        """
        res = self.s3.complete_multipart_upload(
            Bucket=bucket_name,
            Key=key,
            MultipartUpload={
                'Parts': parts
            },
            UploadId=id
        )
        logger.debug(f"Complete multipart upload '{self.upload_id}'")
        logger.debug(res)
        self.upload_id = None
        self.bucket_name = None
        self.key = None
Enter fullscreen mode Exit fullscreen mode

In this class, I wrap the functionality of the S3 client to make it easy to use and less cluttered in the API file.

Once you get the response from the API, it would look something like this:
Code Snippet

You would download this response in a JSON file to upload the data using the CLI.

Upload all parts of the object

Now let’s turn to the CLI code, which uses this JSON file, and we assume that we save this file as presigned.json.

import requests
import progressbar
from pathlib import Path

def main():
    data = eval(open('presigned.json').read())
    upload_by = data['upload_by']
    max_size = data['max_size']
    urls = data['urls']
    target_file = Path(data['file_name'])
    file_size = data['file_size']
    key = data['key']
    upload_id = data['upload_id']
    bucket_name = data['bucket_name']
    bar = progressbar.ProgressBar(maxval=file_size, \
        widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
    json_object = dict()
    parts = []
    file_size_counter = 0
    with target_file.open('rb') as fin:
        bar.start()
        for num, url in enumerate(urls):
            part = num + 1
            file_data = fin.read(max_size)
            file_size_counter += len(file_data)
            res = requests.put(url, data=file_data)

            if res.status_code != 200:
                print (res.status_code)
                print ("Error while uploading your data.")
                return None
            bar.update(file_size_counter)
            etag = res.headers['ETag']
            parts.append((etag, part))
        bar.finish()
        json_object['parts'] = [
            {"ETag": eval(x), 'PartNumber': int(y)} for x, y in parts]
        json_object['upload_id'] = upload_id
        json_object['key'] = key
        json_object['bucket_name'] = bucket_name
    requests.post('https://YOUR_HOSTED_API/combine, json={'parts': json_object})
    print ("Dataset is uploaded successfully")

if __name__ == "__main__":
    main()    
Enter fullscreen mode Exit fullscreen mode

The above code loads the file and gets all the required information, including upload_id, URLs, and others. I use Progressbar to show progress while uploading the file. The entire code is pretty much self-explanatory except for the following line of code:

requests.post('https://YOUR_HOSTED_API/combine, json={'parts': json_object})
Enter fullscreen mode Exit fullscreen mode

To understand this piece of code, we have to look at the final step of completing the upload.

Complete the upload

We have uploaded all parts of the file, but these parts are not yet combined. To combine them we need to tell the s3 that we have finished uploading and that now you can combine the parts. The above request calls the route in the table below and completes the multipart upload using the s3 utility class. It provides the proper information about the file and the upload_id, which tells s3 about the parts of the same file being uploaded using the upload_id.

@app.route("/combine",methods=["POST"])
def combine():
    body = request.form
    body = body['parts']    
    session = Session()
    s3util = Presigned(session)
    parts = body['parts']
    id, key, bucket_name = body['upload_id'], body['key'], body['bucket_name']
    PARTS = [{"Etag": eval(x), 'PartNumber': int(y)} for x, y in parts]
    s3util.complete(PARTS, id, key, bucket_name)
    return Response(status_code=200)
Enter fullscreen mode Exit fullscreen mode

This code is a very minimum required code to create a CLI tool. You can deploy it on a server, which has proper roles in AWS for interacting with S3, to create and return the pre-signed URLs for completing the multipart upload. This way, you can make sure that no one has direct access to your S3 bucket. Instead, they upload the data using pre-signed URLs, which is a secure way of uploading the data.

Discussion (0)

pic
Editor guide