DEV Community

Batch Processing using PySpark on AWS EMR

Are you a Data Engineer and want to do hands-on on AWS services? This blog is about batch data processing using AWS services, you will learn to do batch processing using AWS services: S3 as storage, EMR as processing cluster, and Athena for querying the processed results.

About Batch Data Pipeline:

The Wikipedia Activity data will be put into a folder in the S3 bucket. We will have PySpark code that will run on the EMR cluster. This code will fetch the data from the S3 bucket, perform filtering and aggregation on this data, and push the processed data back into S3 in another folder. We will then use Athena to query this processed data present in S3. We will create a table on top of the processed data by providing the relevant schema and then use ANSI SQL to query the data.

Architecture Diagram:

Architecture Diagram

Languages - Python
Package - PySpark
Services - AWS EMR, AWS S3, AWS Athena.

Dataset:

We'll be using the Wikipedia activity logs JSON dataset that has a huge payload comprising 15+ fields

NOTE: In our Script created we'll take two conditions into consideration that we want only those payloads where isRobot is False & user country is from United Estate

dataset

Steps of Working:

1- Create an S3 bucket with a suitable name i:e., emr-batchprocessing-raw-useast1-Account ID-dev & inside the bucket create folders i:e.,

  • input-source (upload your dataset here it will be your source folder),

  • output-destination (According to scenarios processed from AWS EMR data will be dumped here for further processing) &

  • logs (AWS EMR logs will be saved here. We'll specify this directory during creation of EMR)

S3 bucket Creation

2- Goto EC2-keypair & create Key pair for using that into EMR cluster creation

EC2 key pair

3- Now, We have to create an AWS EMR cluster for that go to AWS EMR from AWS Console & choose EMR on EC2: Clusters

EMR clsuter creation

4- During the Cluster creation provide some suitable name in my case I have provided "emr-batch-processing" & selected spark as the processing engine.

Image description

5- As EMR stands for Elastic Map Reduce & it works under the rule of distributed processing we need master node and woker/core nodes for processing. Note: I removed the Task nodes here during creation

Image description

6- For cluster scaling and provisioning let's go with 2 woker/core nodes since our working in minimal and realistic

EMR Scaling

NOTE: Keep default setting for _Networking _& Cluster termination

7- Select EC2 key pair that we created in Step 2 so that we can do SSH using the terminal

8- For cluster logs Let's choose the Log folder that we created during the Bucket creation step.

EMR logs directory

9- Lastly we Need to create the Amazon EMR service role for Identity and Access Management (IAM) & Similarly Instance Profile for EC2 instance profile for Amazon EMR. After that review steps & Click on Create Create EMR Cluster.

Service Role & Instance Profile

10- Let's create a script that we want to run into our EMR cluster. NOTE: The code given below is only one filtering script for full scripts please refer to GitHub.

Github link: https://github.com/aiwithqasim/emr-batch-processing



from pyspark.sql import SparkSession

S3_DATA_INPUT_PATH="<<bucket link to source dataset>>"
S3_DATA_OUTPUT_PATH_FILTERED="<<bucket link to output folder>>/filtered"

def main():
    spark = SparkSession.builder.appName('EMRBathcProcessing').getOrCreate()
    df = spark.read.json(S3_DATA_INPUT_PATH)
    print(f'The total number of records in the source data set is {df.count()}')
    filtered_df = df.filter((df.isRobot == False) & (df.countryName == 'United States'))
    print(f'The total number of records in the filtered data set is {filtered_df.count()}')
    filtered_df.show(10)
    filtered_df.printSchema()
    filtered_df.write.mode('overwrite').parquet(S3_DATA_OUTPUT_PATH_FILTERED)
    print('The filtered output is uploaded successfully')

if __name__ == '__main__':
    main()


Enter fullscreen mode Exit fullscreen mode

11- Make sure the EMR cluster you created has SSH port 22 open for cluster connection from local _Terminal _ or _Putty _

12- Connect to your AWS EMR EC2 instance using the connection command as shown below:

connectin to AWS EMR

13- Create the main script (include the script that we created above) using the Linux command and submit the code using spark-submit main.py

creating & submitting spark script

14- After completion validate that the code ran successfully & & the terminal has print schema as shown below

code completetion Validation

and also & the S3 bucket has processed data.

Prcessed data in S3

15- Goto AWS Athena query editor

  • Create a table using data in the S3 output folder (processed data)

  • Make sure the table& databaseare created properly

  • Created a new queryto select data from the table created

  • Make sure the query is returning the result properly.

Querying using Athena

Conclusion

In a professional data engineering career, you have various scenarios where data gets collected every day. The data can be processed once a day, i.e., batch processed, and the processed results are stored in a location to derive insights and take appropriate action based on the insights. In this blog, we have implemented a batch-processing pipeline using AWS services. We have taken a day’s worth of data related to Wikipedia, and performed batch processing on it.

For more such content please follow:

Top comments (0)