DEV Community

Ryan Nazareth for AWS Community Builders

Posted on

Building an entirely Serverless Workflow to Analyse Music Data using Step Functions, Glue and Athena

This blog will demonstrate how to create and run an entirely serverless ETL workflow using step functions to execute a glue job to read csv data from S3, carry out transformations in pyspark and writing the results to S3 destination key in parquet format. This will then trigger a glue crawler to create or update tables with the metadata from the parquet files. A successful job run, should then send an SNS notification to a user by email.

Image description

We will use the LastFM dataset which represents listening habits for nearly 1,000 users. These are split into two tsv files, one containing user profiles (gender, age, location, registration date) and the other containing details of music tracks each user has listened to, with associated timestamp.
Using aws glue, we can carry out data transformations in pyspark to generate the insights about the users, like the following:

  • Number of distinct songs each user has played.
  • 100 most popular songs (artist and title) in the dataset, with the number of times each was played.
  • Top 10 longest sessions (by elapsed time), with the associated information about the userid, timestamp of first and last songs in the session, and the list of songs played in the session (in order of play). A user's “session” will be assumed to be comprised of one or more songs played by that user, where each song is started within 20 minutes of the previous song’s start time.

Glue Notebook and Spark Transformations

We will create a glue job by uploading this notebook to Amazon Glue Studio Notebooks. Before setting up any resources, let's first go through the various code snippets and functions in the notebook to describe the different transformation steps to answer the questions listed above.

The first cell imports and initializes a GlueContext object, which is used to create a SparkSession to be used inside the AWS Glue job.
Spark provides a number of classes (StructType, StructField) to specify the structure of the spark dataframe. StructType is a collection of StructField which is used to define the column name, data type and a flag for nullable or not.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import (
    StringType,
    StructField,
    StructType,
    TimestampType,
)
import boto3

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
client = boto3.client('s3')


SESSION_SCHEMA = StructType(
    [
        StructField("userid", StringType(), False),
        StructField("timestamp", TimestampType(), True),
        StructField("artistid", StringType(), True),
        StructField("artistname", StringType(), True),
        StructField("trackid", StringType(), True),
        StructField("trackname", StringType(), True),
    ]
)

S3_PATH="s3://lastfm-dataset/user-session-track.tsv"
BUCKET="lastfm-dataset"

Enter fullscreen mode Exit fullscreen mode

This function will read the LastFM dataset in csv format into a spark dataframe from the S3 bucket lastfm-dataset, using the S3_PATH and schema definition defined above. We will drop the columns we do not need. The schema is printed below by calling the printSchema() method of the spark dataframe.

def read_session_data(spark):
    data = (
        spark.read.format("csv")
        .option("header", "false")
        .option("delimiter", "\t")
        .schema(SESSION_SCHEMA)
        .load(S3_PATH)
    )
    cols_to_drop = ("artistid", "trackid")
    return data.drop(*cols_to_drop).cache()

df = read_session_data(spark)
df.printSchema()
Enter fullscreen mode Exit fullscreen mode

Image description

The function create_users_and_distinct_songs_count will create a list of user IDs, by selecting the columns userid, artistname and trackname, dropping duplicate rows and performing a groupBy count for each userid.

def create_users_and_distinct_songs_count(df: DataFrame) -> DataFrame:
    df1 = df.select("userid", "artistname", "trackname").dropDuplicates()
    df2 = (
        df1.groupBy("userid")
        .agg(count("*").alias("DistinctTrackCount"))
        .orderBy(desc("DistinctTrackCount"))
    )
    return df2

songs_per_user = create_users_and_distinct_songs_count(df)
songs_per_user.show()
Enter fullscreen mode Exit fullscreen mode

Image description

The create_popular_songs function performs a GroupBy count operation for artistname and trackname columns and then ordered in descending order of counts with a limit to get the 100 most popular songs.

def create_popular_songs(df: DataFrame, limit=100) -> DataFrame:
    df1 = (
        df.groupBy("artistname", "trackname")
        .agg(count("*").alias("CountPlayed"))
        .orderBy(desc("CountPlayed"))
        .limit(limit)
    )
    return df1

popular_songs = create_popular_songs(df)
popular_songs.show()
Enter fullscreen mode Exit fullscreen mode

Image description

The next snippet will lag the previous timestamp for each user partition (using window function) and compute the difference between current and previous timestamp in a session per user. We then create a session flag (binary flag) for each user, if time between successive played tracks exceeds session_cutoff (20 minutes). A SessionID column will compute a cumulative sum over the sessionflag column for each user.

We then group the Spark DataFrame by userid and SessionID and compute min and max timestamp as session start and end columns. Then create a session_length (hrs) column which computes the difference between session end and start for each row and convert to hours. Order the DataFrame from max to min session length and limit to top 10 sessions as required.

To get the list of tracks for each session, join to the original raw dataframe read in and group by userid, sessionID and session_length in hours. Now apply the pyspark.sql function collect_list to each group to create a list of tracks for each session.

def create_session_ids_for_all_users(
    df: DataFrame, session_cutoff: int
) -> DataFrame:
    w1 = Window.partitionBy("userid").orderBy("timestamp")
    df1 = (
        df.withColumn("pretimestamp", lag("timestamp").over(w1))
        .withColumn(
            "delta_mins",
            round(
                (
                    col("timestamp").cast("long")
                    - col("pretimestamp").cast("long")
                )
                / 60
            ),
        )
        .withColumn(
            "sessionflag",
            expr(
                f"CASE WHEN delta_mins > {session_cutoff} OR delta_mins IS NULL THEN 1 ELSE 0 END"
            ),
        )
        .withColumn("sessionID", sum("sessionflag").over(w1))
    )
    return df1


def compute_top_n_longest_sessions(df: DataFrame, limit: int) -> DataFrame:
    df1 = (
        df.groupBy("userid", "sessionID")
        .agg(
            min("timestamp").alias("session_start_ts"),
            max("timestamp").alias("session_end_ts"),
        )
        .withColumn(
            "session_length(hrs)",
            round(
                (
                    col("session_end_ts").cast("long")
                    - col("session_start_ts").cast("long")
                )
                / 3600
            ),
        )
        .orderBy(desc("session_length(hrs)"))
        .limit(limit)
    )
    return df1


def longest_sessions_with_tracklist(
    df: DataFrame, session_cutoff: int = 20, limit: int = 10
) -> DataFrame:
    df1 = create_session_ids_for_all_users(df, session_cutoff)
    df2 = compute_top_n_longest_sessions(df1, limit)
    df3 = (
        df1.join(df2, ["userid", "sessionID"])
        .select("userid", "sessionID", "trackname", "session_length(hrs)")
        .groupBy("userid", "sessionID", "session_length(hrs)")
        .agg(collect_list("trackname").alias("tracklist"))
        .orderBy(desc("session_length(hrs)"))
    )
    return df3

df_sessions = longest_sessions_with_tracklist(df)
df_sessions.show()
Enter fullscreen mode Exit fullscreen mode

Image description

Finally, the snippet below will convert pyspark dataframe to glue dynamic dataframe and write to s3 bucket in parquet format, using the write_dynamic_frame() method. By default this method, saves the output files with the prefix part-00 in the name. It would be better to rename this to something simpler. To do this, we can use the copy_object() method of the boto s3 client to copy the existing object to a new location (using a custom name as suffix .e.g popular_songs.parquet) within the bucket.The original object can then be deleted using the delete_object() method.

def rename_s3_results_key(source_key_prefix, dest_key):
    response = client.list_objects_v2(Bucket=BUCKET)
    body = response["Contents"]
    key =  [obj['Key'] for obj in body if source_key_prefix in obj['Key']]
    client.copy_object(Bucket=BUCKET, CopySource={'Bucket': BUCKET, 'Key': key[0]}, Key=dest_key)
    client.delete_object(Bucket=BUCKET, Key=key[0])

def write_ddf_to_s3(df:DataFrame, name: str):
    dyf = DynamicFrame.fromDF(df.repartition(1), glueContext, name)
    sink = glueContext.write_dynamic_frame.from_options(frame=dyf,                                                    connection_type = "s3a",format = "glueparquet",                                  connection_options = {"path": f"s3a://{BUCKET}/results/{name}/", "partitionKeys": []},
                                                        transformation_ctx = f"{name}_sink"
                                                                )
    source_key_prefix = f"results/{name}/run-"
    dest_key = f"results/{name}/{name}.parquet"
    rename_s3_results_key(source_key_prefix, dest_key)
    return sink

write_ddf_to_s3(popular_songs, "popular_songs")
write_ddf_to_s3(df_sessions, "df_sessions")
write_ddf_to_s3(songs_per_user, "distinct_songs")
Enter fullscreen mode Exit fullscreen mode

In the next sections we will setup all the resources defined in the architecture diagram and execute the state machine.

Data upload to S3

First we will create a standard bucket lastfm-dataset from the AWS console to store the source files in and enable transfer acceleration in the bucket properties to optimise transfer speed. This will generate a s3-endpoint s3 accelerate.amazonaws.com, which can be used to upload files to using the cli. Since some of these files are large, it is easier to use the aws s3 commands (such as aws s3 cp) for uploading to the S3 bucket as this will automatically use multipart upload feature if the file size exceeds 100MB.

Image description

Image description

AWS Glue Job and Crawler

We will then create a glue job by uploading this notebook to Amazon Glue Studio Notebooks. We will first need to create a role for Glue to assume and give permissions to access S3 as below.

Glue Job role

  • In the Amazon Glue Studio console, choose Jobs from the navigation menu.
  • In the Create Job options section, select Upload and then select the AWS_Glue_Notebook.ipynb file to upload.

Image description

  • On the next screen, name your job as LastFM_Analysis. Select the glue role created previously in the IAM Role dropdown list. Choose spark kernel and Start Notebook.

Image description

  • We should see the notebook in the next screen. Click 'Save'. If we navigate back to the AWS Glue Studio Jobs tab, we should see the new job LastFM_Analysis created.

Image description

We can now setup the glue crawler from the AWS Glue console, to include the settings in the screenshot below. This will collect metadata from the glue output parquet files in S3 , and update the glue catalog tables.

Image description

SNS topic

We will also need to set up subscription to SNS topic so that notifications will be sent to an email address by following the AWS docs. We will setup a separate task at the end of the Step Function workflow to publish to the SNS topic. However, one could alternatively configure S3 event notification for specific S3 keys so that any parquet outputs from the glue job into S3 will publish to SNS topic destination.

Once you have setup the sub subscription from the console, you should get an email notification, asking you to confirm subscription as below:

Image description

Step Function setup and execution

Now we need to create a state machine. This is a workflow in an AWS Step Function, which consists of a set of states,
each of which represent a single unit of work. The state machine is defined in Amazon States Language, which is a JSON-based notation. In this example, the amazon state language specification is as below. We will use this when creating the state machine in the console.

{
  "Comment": "Glue ETL flights pipeline execution",
  "StartAt": "Glue StartJobRun",
  "States": {
    "Glue StartJobRun": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun",
      "Parameters": {
        "JobName": "LastFM_Analysis",
        "MaxCapacity": 2
      },
      "ResultPath": "$.gluejobresults",
      "Next": "Wait"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "Get Glue Job status"
    },
    "Get Glue Job status": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:glue:getJobRun",
      "Parameters": {
        "JobName.$": "$.gluejobresults.JobName",
        "RunId.$": "$.gluejobresults.JobRunId"
      },
      "Next": "Check Glue Job status",
      "ResultPath": "$.gluejobresults.status"
    },
    "Check Glue Job status": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.gluejobresults.status.JobRun.JobRunState",
          "StringEquals": "SUCCEEDED",
          "Next": "StartCrawler"
        }
      ],
      "Default": "Wait"
    },
    "StartCrawler": {
      "Type": "Task",
      "Parameters": {
        "Name": "LastFM-crawler"
      },
      "Resource": "arn:aws:states:::aws-sdk:glue:startCrawler",
      "Next": "Wait for crawler to complete"
    },
    "Wait for crawler to complete": {
      "Type": "Wait",
      "Seconds": 70,
      "Next": "SNS Publish Success"
    },
    "SNS Publish Success": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:*:Default",
        "Message.$": "$"
      },
      "Next": "Success"
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Before creating the state machine, we will also need to create a role for Step Function to assume, with permissions to call the various services e.g Glue, Athena, SNS, Cloudwatch (if logging will be enabled when creating the state machine) etc using AWS managed policies as below.

Image description

In the Step Functions console, in the State Machine tab:

  • Select Create State Machine
  • Select "Write your workflow in code" with Type "Standard"
  • Paste in the state language specification. This will generate a visual representation of the state machine as below, if the definition is valid.

Image description

  • Select next and then in the "Specify Details" section, fill in the State Machine Name, execution role created previously from the dropdown and turn on Logging to CloudWatch. Then click "Create State Machine"

Image description

Let us go through what each of the states will be doing. The first task state Glue StartJobRun will start the glue job LastFM_Analysis with 2 data processing units (DPUs) capacity as specified in the parameters block. The output of this state is then included in the ResultsPath as $.gluejobresults along with the original input. This will give access to glue job metadata like the job id, status, job name to be used as parameters for subsequent states.

The next state is a Wait state which pauses the execution of the state machine for 30 seconds before proceeding to the next tasks of checking the glue job status for the glue job. Using Choice state , we can add a condition to proceed to the next task (StartCrawler) if the value of glue job status is SUCCEEDED, otherwise it loops back to the Wait Task activity and waits for another 30 seconds before repeating the process again. This ensures we only start crawling the data from S3 when the glue job has completed as the output parquet files will be available and ready to be crawled.

Similarly, after the StartCrawler task, we can add a wait state to pause step function for 70 seconds (we expect the crawler to have completed in a minute), to ensure that a notification is sent to the SNS topic Default only when the crawler has completed successfully.

Now the state machine can be executed. If the step function completes successfully, we should see an output similar to below.

Image description

If the glue job is successful, we should see the parquet files in dedicated subfolders in the results folder in the S3 bucket. You should also get a notification to the email subscribed to the SNS topic.

Image description

Image description

The catalog tables should be created after successful completion of the crawler. We can now query the tables in Athena as below. The tables could also be accessed via Amazon Quicksight or Tableau for generating visualisation dashboards for further insights.

Image description

Image description

Top comments (0)