This blog will demonstrate how to create and run an entirely serverless ETL workflow using step functions to execute a glue job to read csv data from S3, carry out transformations in pyspark and writing the results to S3 destination key in parquet format. This will then trigger a glue crawler to create or update tables with the metadata from the parquet files. A successful job run, should then send an SNS notification to a user by email.
We will use the LastFM dataset which represents listening habits for nearly 1,000 users. These are split into two tsv
files, one containing user profiles (gender, age, location, registration date) and the other containing details of music tracks each user has listened to, with associated timestamp.
Using aws glue, we can carry out data transformations in pyspark to generate the insights about the users, like the following:
- Number of distinct songs each user has played.
- 100 most popular songs (artist and title) in the dataset, with the number of times each was played.
- Top 10 longest sessions (by elapsed time), with the associated information about the userid, timestamp of first and last songs in the session, and the list of songs played in the session (in order of play). A user's “session” will be assumed to be comprised of one or more songs played by that user, where each song is started within 20 minutes of the previous song’s start time.
Glue Notebook and Spark Transformations
We will create a glue job by uploading this notebook to Amazon Glue Studio Notebooks. Before setting up any resources, let's first go through the various code snippets and functions in the notebook to describe the different transformation steps to answer the questions listed above.
The first cell imports and initializes a GlueContext object, which is used to create a SparkSession to be used inside the AWS Glue job.
Spark provides a number of classes (StructType
, StructField
) to specify the structure of the spark dataframe. StructType
is a collection of StructField
which is used to define the column name, data type and a flag for nullable or not.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import (
StringType,
StructField,
StructType,
TimestampType,
)
import boto3
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
client = boto3.client('s3')
SESSION_SCHEMA = StructType(
[
StructField("userid", StringType(), False),
StructField("timestamp", TimestampType(), True),
StructField("artistid", StringType(), True),
StructField("artistname", StringType(), True),
StructField("trackid", StringType(), True),
StructField("trackname", StringType(), True),
]
)
S3_PATH="s3://lastfm-dataset/user-session-track.tsv"
BUCKET="lastfm-dataset"
This function will read the LastFM dataset in csv format into a spark dataframe from the S3 bucket lastfm-dataset
, using the S3_PATH and schema definition defined above. We will drop the columns we do not need. The schema is printed below by calling the printSchema()
method of the spark dataframe.
def read_session_data(spark):
data = (
spark.read.format("csv")
.option("header", "false")
.option("delimiter", "\t")
.schema(SESSION_SCHEMA)
.load(S3_PATH)
)
cols_to_drop = ("artistid", "trackid")
return data.drop(*cols_to_drop).cache()
df = read_session_data(spark)
df.printSchema()
The function create_users_and_distinct_songs_count
will create a list of user IDs, by selecting the columns userid
, artistname
and trackname
, dropping duplicate rows and performing a groupBy count for each userid
.
def create_users_and_distinct_songs_count(df: DataFrame) -> DataFrame:
df1 = df.select("userid", "artistname", "trackname").dropDuplicates()
df2 = (
df1.groupBy("userid")
.agg(count("*").alias("DistinctTrackCount"))
.orderBy(desc("DistinctTrackCount"))
)
return df2
songs_per_user = create_users_and_distinct_songs_count(df)
songs_per_user.show()
The create_popular_songs
function performs a GroupBy
count operation for artistname
and trackname
columns and then ordered in descending order of counts with a limit to get the 100 most popular songs.
def create_popular_songs(df: DataFrame, limit=100) -> DataFrame:
df1 = (
df.groupBy("artistname", "trackname")
.agg(count("*").alias("CountPlayed"))
.orderBy(desc("CountPlayed"))
.limit(limit)
)
return df1
popular_songs = create_popular_songs(df)
popular_songs.show()
The next snippet will lag the previous timestamp for each user partition (using window function) and compute the difference between current and previous timestamp in a session per user. We then create a session flag (binary flag) for each user, if time between successive played tracks exceeds session_cutoff (20 minutes). A SessionID
column will compute a cumulative sum over the sessionflag column for each user.
We then group the Spark DataFrame by userid
and SessionID
and compute min and max timestamp as session start and end columns. Then create a session_length (hrs) column which computes the difference between session end and start for each row and convert to hours. Order the DataFrame from max to min session length and limit to top 10 sessions as required.
To get the list of tracks for each session, join to the original raw dataframe read in and group by userid
, sessionID
and session_length
in hours. Now apply the pyspark.sql
function collect_list
to each group to create a list of tracks for each session.
def create_session_ids_for_all_users(
df: DataFrame, session_cutoff: int
) -> DataFrame:
w1 = Window.partitionBy("userid").orderBy("timestamp")
df1 = (
df.withColumn("pretimestamp", lag("timestamp").over(w1))
.withColumn(
"delta_mins",
round(
(
col("timestamp").cast("long")
- col("pretimestamp").cast("long")
)
/ 60
),
)
.withColumn(
"sessionflag",
expr(
f"CASE WHEN delta_mins > {session_cutoff} OR delta_mins IS NULL THEN 1 ELSE 0 END"
),
)
.withColumn("sessionID", sum("sessionflag").over(w1))
)
return df1
def compute_top_n_longest_sessions(df: DataFrame, limit: int) -> DataFrame:
df1 = (
df.groupBy("userid", "sessionID")
.agg(
min("timestamp").alias("session_start_ts"),
max("timestamp").alias("session_end_ts"),
)
.withColumn(
"session_length(hrs)",
round(
(
col("session_end_ts").cast("long")
- col("session_start_ts").cast("long")
)
/ 3600
),
)
.orderBy(desc("session_length(hrs)"))
.limit(limit)
)
return df1
def longest_sessions_with_tracklist(
df: DataFrame, session_cutoff: int = 20, limit: int = 10
) -> DataFrame:
df1 = create_session_ids_for_all_users(df, session_cutoff)
df2 = compute_top_n_longest_sessions(df1, limit)
df3 = (
df1.join(df2, ["userid", "sessionID"])
.select("userid", "sessionID", "trackname", "session_length(hrs)")
.groupBy("userid", "sessionID", "session_length(hrs)")
.agg(collect_list("trackname").alias("tracklist"))
.orderBy(desc("session_length(hrs)"))
)
return df3
df_sessions = longest_sessions_with_tracklist(df)
df_sessions.show()
Finally, the snippet below will convert pyspark dataframe to glue dynamic dataframe and write to s3 bucket in parquet format, using the write_dynamic_frame()
method. By default this method, saves the output files with the prefix part-00
in the name. It would be better to rename this to something simpler. To do this, we can use the copy_object()
method of the boto s3 client to copy the existing object to a new location (using a custom name as suffix .e.g popular_songs.parquet
) within the bucket.The original object can then be deleted using the delete_object()
method.
def rename_s3_results_key(source_key_prefix, dest_key):
response = client.list_objects_v2(Bucket=BUCKET)
body = response["Contents"]
key = [obj['Key'] for obj in body if source_key_prefix in obj['Key']]
client.copy_object(Bucket=BUCKET, CopySource={'Bucket': BUCKET, 'Key': key[0]}, Key=dest_key)
client.delete_object(Bucket=BUCKET, Key=key[0])
def write_ddf_to_s3(df:DataFrame, name: str):
dyf = DynamicFrame.fromDF(df.repartition(1), glueContext, name)
sink = glueContext.write_dynamic_frame.from_options(frame=dyf, connection_type = "s3a",format = "glueparquet", connection_options = {"path": f"s3a://{BUCKET}/results/{name}/", "partitionKeys": []},
transformation_ctx = f"{name}_sink"
)
source_key_prefix = f"results/{name}/run-"
dest_key = f"results/{name}/{name}.parquet"
rename_s3_results_key(source_key_prefix, dest_key)
return sink
write_ddf_to_s3(popular_songs, "popular_songs")
write_ddf_to_s3(df_sessions, "df_sessions")
write_ddf_to_s3(songs_per_user, "distinct_songs")
In the next sections we will setup all the resources defined in the architecture diagram and execute the state machine.
Data upload to S3
First we will create a standard bucket lastfm-dataset
from the AWS console to store the source files in and enable transfer acceleration in the bucket properties to optimise transfer speed. This will generate a s3-endpoint s3 accelerate.amazonaws.com
, which can be used to upload files to using the cli. Since some of these files are large, it is easier to use the aws s3
commands (such as aws s3 cp
) for uploading to the S3 bucket as this will automatically use multipart upload feature if the file size exceeds 100MB.
AWS Glue Job and Crawler
We will then create a glue job by uploading this notebook to Amazon Glue Studio Notebooks. We will first need to create a role for Glue to assume and give permissions to access S3 as below.
- In the Amazon Glue Studio console, choose Jobs from the navigation menu.
- In the Create Job options section, select Upload and then select the
AWS_Glue_Notebook.ipynb
file to upload.
- On the next screen, name your job as
LastFM_Analysis
. Select the glue role created previously in the IAM Role dropdown list. Choose spark kernel andStart Notebook
.
- We should see the notebook in the next screen. Click 'Save'.
If we navigate back to the AWS Glue Studio Jobs tab, we should see the new job
LastFM_Analysis
created.
We can now setup the glue crawler from the AWS Glue console, to include the settings in the screenshot below. This will collect metadata from the glue output parquet files in S3 , and update the glue catalog tables.
SNS topic
We will also need to set up subscription to SNS topic so that notifications will be sent to an email address by following the AWS docs. We will setup a separate task at the end of the Step Function workflow to publish to the SNS topic. However, one could alternatively configure S3 event notification for specific S3 keys so that any parquet outputs from the glue job into S3 will publish to SNS topic destination.
Once you have setup the sub subscription from the console, you should get an email notification, asking you to confirm subscription as below:
Step Function setup and execution
Now we need to create a state machine. This is a workflow in an AWS Step Function, which consists of a set of states,
each of which represent a single unit of work. The state machine is defined in Amazon States Language, which is a JSON-based notation. In this example, the amazon state language specification is as below. We will use this when creating the state machine in the console.
{
"Comment": "Glue ETL flights pipeline execution",
"StartAt": "Glue StartJobRun",
"States": {
"Glue StartJobRun": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun",
"Parameters": {
"JobName": "LastFM_Analysis",
"MaxCapacity": 2
},
"ResultPath": "$.gluejobresults",
"Next": "Wait"
},
"Wait": {
"Type": "Wait",
"Seconds": 30,
"Next": "Get Glue Job status"
},
"Get Glue Job status": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:glue:getJobRun",
"Parameters": {
"JobName.$": "$.gluejobresults.JobName",
"RunId.$": "$.gluejobresults.JobRunId"
},
"Next": "Check Glue Job status",
"ResultPath": "$.gluejobresults.status"
},
"Check Glue Job status": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.gluejobresults.status.JobRun.JobRunState",
"StringEquals": "SUCCEEDED",
"Next": "StartCrawler"
}
],
"Default": "Wait"
},
"StartCrawler": {
"Type": "Task",
"Parameters": {
"Name": "LastFM-crawler"
},
"Resource": "arn:aws:states:::aws-sdk:glue:startCrawler",
"Next": "Wait for crawler to complete"
},
"Wait for crawler to complete": {
"Type": "Wait",
"Seconds": 70,
"Next": "SNS Publish Success"
},
"SNS Publish Success": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:*:Default",
"Message.$": "$"
},
"Next": "Success"
},
"Success": {
"Type": "Succeed"
}
}
}
Before creating the state machine, we will also need to create a role for Step Function to assume, with permissions to call the various services e.g Glue, Athena, SNS, Cloudwatch (if logging will be enabled when creating the state machine) etc using AWS managed policies as below.
In the Step Functions console, in the State Machine tab:
- Select Create State Machine
- Select "Write your workflow in code" with Type "Standard"
- Paste in the state language specification. This will generate a visual representation of the state machine as below, if the definition is valid.
- Select next and then in the "Specify Details" section, fill in the State Machine Name, execution role created previously from the dropdown and turn on Logging to CloudWatch. Then click "Create State Machine"
Let us go through what each of the states will be doing. The first task state Glue StartJobRun
will start the glue job LastFM_Analysis
with 2 data processing units (DPUs) capacity as specified in the parameters block. The output of this state is then included in the ResultsPath as $.gluejobresults
along with the original input. This will give access to glue job metadata like the job id, status, job name to be used as parameters for subsequent states.
The next state is a Wait state which pauses the execution of the state machine for 30 seconds before proceeding to the next tasks of checking the glue job status for the glue job. Using Choice state , we can add a condition to proceed to the next task (StartCrawler
) if the value of glue job status is SUCCEEDED
, otherwise it loops back to the Wait Task activity and waits for another 30 seconds before repeating the process again. This ensures we only start crawling the data from S3 when the glue job has completed as the output parquet files will be available and ready to be crawled.
Similarly, after the StartCrawler
task, we can add a wait state to pause step function for 70 seconds (we expect the crawler to have completed in a minute), to ensure that a notification is sent to the SNS topic Default
only when the crawler has completed successfully.
Now the state machine can be executed. If the step function completes successfully, we should see an output similar to below.
If the glue job is successful, we should see the parquet files in dedicated subfolders in the results folder in the S3 bucket. You should also get a notification to the email subscribed to the SNS topic.
The catalog tables should be created after successful completion of the crawler. We can now query the tables in Athena as below. The tables could also be accessed via Amazon Quicksight or Tableau for generating visualisation dashboards for further insights.
Top comments (0)