Amazon EMR Serverless is a brand new AWS Service made generally available in June 1st, 2022. With this service, it is possible to run serverless Spark clusters that can process TB scale data very easily and using any spark open source libraries. Getting started with EMR Serverless can be a bit tricky. The goal of this post is to help you get your Spark+Delta jobs up and running "serverlessly". Let's get to it!
Setup - Authentication
In order to run EMR Serverless you'll need to configure two IAM roles, a service-linked role and an access authorization role for your spark jobs. The service-linked role is very straightforward to create. Go to IAM on the AWS console, click on roles and click on Create role,
choose Amazon EMR Serverless,
and choose default settings until you finish creating the role. Next, create a job role with permissions to access S3 and glue. We will create a very open role (not the best practice) for didactic purposes. In a "production" environment, you should make your permissions very strict.
Click again Create role on the AWS console Roles section and mark Custom Trust Policy. Below, in the "Service" key, replace "{}" with emr-serverless.amazonaws.com.
Next, you can select 2 AWS managed policies, "AmazonS3FullAccess" and "AWSGlueConsoleFullAccess". Click next, give your new role an easy identifiable name (like "EMRServerlessJobRole") and finish creating the role.
Setup - Data
For this post, we are working with the (very famous) titanic dataset which you can download here and upload to S3.
Data Pipeline Strategy
Delta Lake is a great tool that implements the Lakehouse architecture. It has many cool features (such as schema evolution, data time travel, transaction logs, ACID transactions) and it is fundamentally valuable when we have a case of incremental data ingestion. Thus, we are going to simulate some changes in titanic dataset. We will include two new passengers (Ney and Sarah) and we will update information on two passengers that were presumed dead but found alive(!!!), Mr. Owen Braund and Mr. William Allen.
First version of data is written as a delta table and the updates will be written with an upsert transaction.
Python code to do those operations is presented below:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.getOrCreate()
)
from delta.tables import *
print("Reading CSV file from S3...")
schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
"s3://<YOUR-BUCKET>/titanic",
header=True, schema=schema, sep=";"
)
print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")
print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
(892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
(893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)
print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")
print("UPSERT...")
# UPSERT
(
old.alias("old")
.merge(new.alias("new"),
"old.PassengerId = new.PassengerId"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print("Checking if everything is ok")
print("New data...")
(
spark.read.format("delta")
.load("s3://<YOUR-BUCKET>/silver/titanic_delta")
.where("PassengerId < 6 OR PassengerId > 888")
.show()
)
print("Old data - with time travel")
(
spark.read.format("delta")
.option("versionAsOf", "0")
.load("s3://<YOUR-BUCKET>/silver/titanic_delta")
.where("PassengerId < 6 OR PassengerId > 888")
.show()
)
This .py
file should be uploaded to S3.
Dependencies
One thing about EMR Serverless latest release available (6.6.0) is that the spark-submit
flag --packages
is not available yet (π’). So, we have an extra step to use java dependencies and python dependencies.
Jars
To use java dependencies, we have to build them manually into a single .jar file. AWS has provided a Dockerfile that we can use to build the dependencies without having to install maven locally (π). I used this pom.xml
file to define the dependencies:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.serverless-samples</groupId>
<artifactId>jars</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>jars</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>1.2.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>uber-${artifactId}-${version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
</project>
We build the final .jar file with the command
docker build -f Dockerfile.jars --output . .
The output uber-jars-1.0-SNAPSHOT.jar
must be uploaded to S3.
With this .jar file, we can use .format("delta")
in our python code but if we try to import delta.tables
we will get a python dependency error.
Python
We can build python dependencies in two ways: uploading dependencies files to S3 or building a virtual environment to use in EMR Serverless. For this post, uploading a zip file with delta python library was very simple to do.
mkdir dependencies
pip install delta-spark==1.2.1 --target dependencies
cd dependencies
zip -r9 ../emrserverless_dependencies.zip .
The emrserverless_dependencies.zip
file must also be uploaded to S3.
Now, we are ready to configure our serverless Spark application.
EMR Serverless
First, we must create an EMR Studio. If you don't have any studios created yet, this is very straightforward. After clicking Get started in the EMR Serverless home page, you can click to create a studio automatically.
Then click to enter the studio url.
With EMR Serverless, we don't have to create a cluster. Instead, we work with the application concept. To create a new EMR Serverless application, click Create application, type an application name, select version and click Create application again at the bottom of the page.
Now, the last thing to do is to submit a spark job. If you have aws cli installed, the code below will submit a job spark job.
aws emr-serverless start-job-run \
--name Delta-Upsert \
--application-id <YOUR-APPLICATION-ID> \
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py",
"sparkSubmitParameters": "--jars s3://<YOUR-BUCKET>/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar --conf spark.submit.pyFiles=s3://<YOUR-BUCKET>/pyspark/dependencies/emrserverless_dependencies.zip"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"}
}
}'
Some important parameters:
- entrypoint sets the s3 path for you pyspark script
-
sparkSubmitParameters: you should add the java dependencies with the
--jars
flag and set the--conf spark.submit.pyFiles=<YOUR .py/.zip/.egg FILE>
- s3MonitoringConfiguration sets the s3 path that will be used to save job logs.
If you wish to use the console, set the job name, role and script location
and .jar file and .zip file location as follows
Spark job should start after this. When it finishes, check the logs folder in s3 (look for your application ID, job ID and SPARK_DRIVER logs). You should see something like this
Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 1| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 1| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|
| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|
| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|
| 892| 1| 1| Sarah Crepalde|female|23.0| 1| 0| null| null| null| null|
| 893| 0| 1| Ney Crepalde| male|35.0| 1| 0| null| null| null| null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|
| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|
| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Notice the same delta table being shown in 2 different moments using time travel. The last version, with Mr. Braund and Mr. Allen marked as alive and the new passengers in the first table and the original version of the dataset in the second table.
Top comments (0)