DEV Community 👩‍💻👨‍💻

Discussion on: Migrating to DynamoDB using Parquet and Glue

Collapse
lukaszbudnik profile image
Łukasz Budnik Author

Hi @kimsean ,

Sorry was on annual leave and wasn't checking my emails and notifications. No problem here is my code, I have replaced any account-specific details with XXX. I also removed the automatically generated comments to keep it short.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame

# this is the only part that I had to write
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    glueContext.write_dynamic_frame_from_options(
        frame=dfc,
        connection_type="dynamodb",
        connection_options={
            "dynamodb.output.tableName": "XXX",
            "dynamodb.throughput.write.percent": "0.95"
        }
    )

# created automatically by AWS Glue Studio
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# DataSource - created automatically by Glue Studio
DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "s3", format = "parquet", connection_options = {"paths": ["s3://XXX/parquet/2021/04/29/public/test_02/"], "recurse":True}, transformation_ctx = "DataSource0")

# Mapping - created automatically by Glue Studio
# there are 20 columns in my data set, I removed 18 mappings to keep it short
# but this mapping was generated automatically by Glue Studio, didn't have to do anything here
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("id", "string", "id", "string"), ..., ("status", "string", "status", "string")], transformation_ctx = "Transform0")

# MyTransform - created by Glue Studio as a part of the custom transformation block
Transform1 = MyTransform(glueContext, DynamicFrameCollection({"Transform0": Transform0}, glueContext))

job.commit()
Enter fullscreen mode Exit fullscreen mode