DEV Community

Cover image for Guide - AWS Glue and PySpark
Anand
Anand

Posted on

Guide - AWS Glue and PySpark

In this post, I have penned down AWS Glue and PySpark functionalities which can be helpful when thinking of creating AWS pipeline and writing AWS Glue PySpark scripts.

AWS Glue is a fully managed extract, transform, and load (ETL) service to process large amount of datasets from various sources for analytics and data processing.

While creating the AWS Glue job, you can select between Spark, Spark Streaming and Python shell. These job can run proposed script generated by AWS Glue, or an existing script that you provide or a new script authored by you. Along with this you can select different monitoring options, job execution capacity, timeouts, delayed notification threshold and non-overridable and overridable parameters.

Glue Job Type and Glue Version

Script file name and other available options

AWS recently launched Glue version 2.0 which features 10x faster Spark ETL job start times and reducing the billing duration from a 10 minute minimum to 1 minute minimum.

https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration

With AWS Glue you can create development endpoint and configure SageMaker or Zeppelin notebooks to develop and test your Glue ETL scripts.

I create a SageMaker notebook connected to the Dev endpoint to author and test the ETL scripts. Depending on the language you are comfortable with, you can spin up the notebook.

Now, lets talk about some specific features and functionalities in AWS Glue and PySpark which can be helpful.

1. Spark DataFrames

Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database. You can create DataFrame from RDD, from file formats like csv, json, parquet.

With SageMaker Sparkmagic(PySpark) Kernel notebook, Spark session is automatically created.

To create DataFrame -

# from CSV files 
S3_IN = "s3://mybucket/train/training.csv"

csv_df = (
    spark.read.format("org.apache.spark.csv")
    .option("header", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("inferSchema", True)
    .option("ignoreLeadingWhiteSpace", True)
    .option("ignoreTrailingWhiteSpace", True)
    .csv(S3_IN, multiLine=False)
)

# from PARQUET files 
S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"

df = spark.read.parquet(S3_PARQUET)

# from JSON files
df = spark.read.json(S3_JSON)

# from multiline JSON file 
df = spark.read.json(S3_JSON, multiLine=True)

2. GlueContext

GlueContext is the entry point for reading and writing DynamicFrames in AWS Glue. It wraps the Apache SparkSQL SQLContext object providing mechanisms for interacting with the Apache Spark platform.

from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(SparkContext.getOrCreate())

3. DynamicFrame

AWS Glue DynamicFrames are similar to SparkSQL DataFrames. It represent a distributed collection of data without requiring you to specify a schema.It can also be used to read and transform data that contains inconsistent values and types.

DynamicFrame can be created using the below options –

  • create_dynamic_frame_from_rdd – created from an Apache Spark Resilient Distributed Dataset (RDD)
  • create_dynamic_frame_from_catalog – created using a Glue catalog database and table name
  • create_dynamic_frame_from_options – created with the specified connection and format. Example – The connection type, such as Amazon S3, Amazon Redshift, and JDBC

DynamicFrames can be converted to and from DataFrames using .toDF() and fromDF().

#create DynamicFame from S3 parquet files
datasource0 = glueContext.create_dynamic_frame_from_options(
            connection_type="s3",
            connection_options = {
                "paths": [S3_location]
            },
            format="parquet",
            transformation_ctx="datasource0")

#create DynamicFame from glue catalog 
datasource0 = glueContext.create_dynamic_frame.from_catalog(
           database = "demo",
           table_name = "testtable",
           transformation_ctx = "datasource0")

#convert to spark DataFrame 
df1 = datasource0.toDF()

#convert to Glue DynamicFrame
df2 = DynamicFrame.fromDF(df1, glueContext , "df2")

Further Read - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog

4. AWS Glue Job Bookmark

AWS Glue Job bookmark helps process incremental data when rerunning the job on a scheduled interval, preventing reprocessing of old data.

Further Read - https://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html

5. Write out data

The DynamicFrame of transformed dataset can be written out to S3 as non-partitioned (default) or partitioned. "partitionKeys" parameter can be specified in connection_option to write out the data to S3 as partitioned. AWS Glue organizes these dataset in Hive-style partition.

In the below code example, AWS Glue DynamicFrame is partitioned by year, month, day, hour and written in parquet format in Hive-style partition on to S3.

s3://bucket_name/table_name/year=2020/month=7/day=13/hour=14/part-000-671c.c000.snappy.parquet

S3_location = "s3://bucket_name/table_name"

datasink = glueContext.write_dynamic_frame_from_options(
    frame= data,
    connection_type="s3",
    connection_options={
        "path": S3_location,
        "partitionKeys": ["year", "month", "day", "hour"]
    },
    format="parquet",
    transformation_ctx ="datasink")

Further Read - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options

6. "glueparquet" format option

glueparquet is a performance optimized Apache parquet writer type for writing DynamicFrames. It computes and modifies the schema dynamically.

datasink = glueContext.write_dynamic_frame_from_options(
               frame=dynamicframe,
               connection_type="s3",
               connection_options={
                  "path": S3_location,
                  "partitionKeys": ["year", "month", "day", "hour"]
               },
               format="glueparquet",
               format_options = {"compression": "snappy"},
               transformation_ctx ="datasink")

Further Read - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

7. S3 Lister and other options for optimizing memory management

AWS Glue provides an optimized mechanism to list files on S3 while reading data into DynamicFrame which can be enabled using additional_options parameter "useS3ListImplementation" to true.

Further Read - https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/

8. Purge S3 path

purge_s3_path is a nice option available to delete files from specified S3 path recursively based on retention period or other available filters. As an example, suppose you are running AWS Glue job to fully refresh the table per day writing the data to S3 with naming convention of s3://bucket-name/table-name/dt=<data-time>. Based on the defined retention period using the Glue job itself you can delete the dt=<date-time> s3 folders. Another option is to set S3 bucket lifecycle policy with prefix.

#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
glueContext.purge_s3_path(
    s3_path=output_loc, 
    options={"retentionPeriod": 72})

You have other options like purge_table, transition_table and transition_s3_path also available. The transition_table option transitions the storage class of the files stored on Amazon S3 for the specified catalog's database and table.

Further Read - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path

9. Relationalize Class

Relationalize class can help flatten nested json outermost level.

Further read - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/

10. Unbox Class

The Unbox class helps unbox string field in DynamicFrame to specified format type(optional).

Further read - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/

11. Unnest Class

The Unnest class flattens nested objects to top-level elements in a DynamicFrame.

root
|-- id: string
|-- type: string
|-- content: map
|    |-- keyType: string
|    |-- valueType: string

With content attribute/column being map Type, we can use unnest class to unnest each key elements.

unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
unnested.printSchema()
root
|-- id: string
|-- type: string
|-- content.dateLastUpdated: string
|-- content.creator: string
|-- content.dateCreated: string
|-- content.title: string

12. printSchema()

To print the Spark or Glue DynamicFrame schema in tree format use printSchema().

datasource0.printSchema()

root
|-- ID: int
|-- Name: string
|-- Identity: string
|-- Alignment: string
|-- EyeColor: string
|-- HairColor: string
|-- Gender: string
|-- Status: string
|-- Appearances: int
|-- FirstAppearance: choice
|    |-- int
|    |-- long
|    |-- string
|-- Year: int
|-- Universe: string

13. Fields Selection

select_fields can be used to select fields from Glue DynamicFrame.

# From DynamicFrame

datasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()

To select fields from Spark Dataframe use "select" -

# From Dataframe

datasource0_df.select(["Status","HairColor"]).distinct().show()

14. Timestamp

Suppose the application writes data into DynamoDB and has last_updated attribute/column. DynamoDB does not natively support date/timestamp data type. So, you could either store it as String or Number. If stored as number, its usually done as epoch time - the number of seconds since 00:00:00 UTC on 1 January 1970. You could see something like "1598331963" which is 2020-08-25T05:06:03+00:00 in ISO 8601.

https://www.unixtimestamp.com/index.php

How can you convert it to timestamp?

When you read the data using AWS Glue DynamicFrame and view the schema, it will show it as "long" data type.

root
|-- version: string
|-- item_id: string
|-- status: string
|-- event_type: string
|-- last_updated: long

To convert the last_updated long data type into timestamp data type, you can use the below -

import pyspark.sql.functions as f
import pyspark.sql.types as t

new_df = (
    df
        .withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
)   

15. Temporary View from Spark DataFrame

If you want to store the Spark DataFrame as table and query it using spark sql, you can convert the DataFrame into temporary view that is available for only that spark session using createOrReplaceTempView.

df = spark.createDataFrame(
    [
        (1, ['a', 'b', 'c'], 90.00),
        (2, ['x', 'y'], 99.99),
    ],
    ['id', 'event', 'score'] 
)

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- event: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: double (nullable = true)

df.createOrReplaceTempView("example")

spark.sql("select * from example").show()

+---+---------+-----+
| id|    event|score|
+---+---------+-----+
|  1|[a, b, c]| 90.0|
|  2|   [x, y]|99.99|
+---+---------+-----+

16. Extract element from ArrayType

Suppose from the above example, you want to create a new attribute/column to store only the last event. How would you do it?

Using element_at function. It returns element of array at given index in extraction if col is array. It can also be used to extract given key in extraction if col is map.

import pyspark.sql.functions as element_at

newdf = df.withColumn("last_event", element_at("event", -1))

newdf.printSchema()
root
 |-- id: long (nullable = true)
 |-- event: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: double (nullable = true)
 |-- last_event: string (nullable = true)

newdf.show()
+---+---------+-----+----------+
| id|    event|score|last_event|
+---+---------+-----+----------+
|  1|[a, b, c]| 90.0|         c|
|  2|   [x, y]|99.99|         y|
+---+---------+-----+----------+

17. explode

The explode function in PySpark is used to explode array or map columns in rows. Taking an example, lets try to explode "event" column from the above example

from pyspark.sql.functions import explode

df1 = df.select(df.id,explode(df.event))

df1.printSchema()
root
 |-- id: long (nullable = true)
 |-- col: string (nullable = true)

df1.show()
+---+---+
| id|col|
+---+---+
|  1|  a|
|  1|  b|
|  1|  c|
|  2|  x|
|  2|  y|
+---+---+

18. getField

In a Struct type, if you want to get a field by name, you can use "getField".

import pyspark.sql.functions as f
from pyspark.sql import Row

from pyspark.sql import Row
df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
                            Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
)

df.printSchema()
root
 |-- attributes: struct (nullable = true)
 |    |-- Hair: string (nullable = true)
 |    |-- Height: double (nullable = true)
 |    |-- Name: string (nullable = true)

df.show()
+-------------------+
|         attributes|
+-------------------+
|[black, 6.0, scott]|
|[brown, 6.1, kevin]|
+-------------------+

df1 = (df
      .withColumn("name", f.col("attributes").getField("Name"))
      .withColumn("height", f.col("attributes").getField("Height"))
      .drop("attributes")
      )

df1.show()
+-----+------+
| name|height|
+-----+------+
|scott|   6.0|
|kevin|   5.1|
+-----+------+

19. startswith

If you want to find records based on string match you can use "startswith".

In the below example I am searching for all records where value for description column starts with "[{".

import pyspark.sql.functions as f

df.filter(f.col("description").startswith("[{")).show()

20. Extract year, month, day, hour

One of the common use case is to write the AWS Glue DynamicFrame or Spark DataFrame to S3 in Hive-style partition. To do so you can extract year, month, day, hour and use it as partitionkeys to write the DynamicFrame/DataFrame to S3.

import pyspark.sql.functions as f

df2 = (raw_df
        .withColumn('year', f.year(f.col('last_updated')))
        .withColumn('month', f.month(f.col('last_updated')))
        .withColumn('day', f.dayofmonth(f.col('last_updated')))
        .withColumn('hour', f.hour(f.col('last_updated')))            
        )

Top comments (0)