In a data-driven organization, there is a constant need to provide vast amounts of data to the teams. There are many tools available to aid your requirements and needs. Choosing the right tool can be a little challenging and overwhelming at times. The basic principle you can keep in mind is that there is no right tool or architecture, it depends on what you need.
In this guide, I’m going to show you how to build a simple event-driven data pipeline in AWS. Pipelines are often scheduled or interval based, however, the event-driven concept is unique and a good starting point. Instead of trying to figure out the right intervals of the pipeline activation, you can use an event handler to deal with certain events to activate your pipeline.
To learn more about which problem we were solving in Traindex and why the data pipeline was the right choice for us, refer to my previous article Introduction to Data Pipelines.
As an example, we would be using the “Sentiment140 dataset with 1.6 million tweets” which is available on Kaggle. Our goal would be to set up a data preprocessing pipeline. Once you have uploaded the CSV file in a specified bucket, an event is generated. A lambda function would handle that event and will activate your pipeline. Your data pipeline would be AWS Data Pipeline which is a web service that helps you process and move data between different AWS compute and storage services. This pipeline would divide a compute resource and run your preprocessing code in that resource. Once your data is cleaned and preprocessed, it will upload it to the specified bucket for later use. Based on these objectives, we can divide our task into the following sub-tasks:
- Creating a pre-configured AMI
- Defining AWS data pipeline architecture
- Writing the event handler AWS Lambda function
- Integrating everything
Before diving into the steps, make sure you have the following preconditions met
- You require an AWS account with certain IAM privileges
- Make sure you have already downloaded the data from “Sentiment140 dataset with 1.6 million tweets”
- Active internet connection
Pre-Configured AMI
This step can be optional based on your requirements but it is good to have a pre-configured AMI that you can use in the compute resources. Follow the following steps to create a pre-configured AMI:
- Go to the AWS console, click on the Services dropdown menu, and select EC2
- On the EC2 dashboard, select Launch an Instance
- Select the Amazon Linux AMI 2018.03.0 (HVM), SSD Volume Type - ami-01fee56b22f308154
- Select the General Purpose t2.micro which is free-tier eligible
- Click on Review and Launch and then click Launch to launch this EC2 instance
- Now go to the EC2 dashboard and select your EC2 Instance. Copy the public DNS and SSH into your created instance.
- Now, install all the required packages, tools, and libraries in it using standard Linux commands.
- Also, set up any credentials you might require later like AWS credentials, etc.
- Once satisfied with your instance, it’s time to create an AMI image from this instance.
- Go to EC2 dashboard, right-click on your instance. Click on Actions, select Image, and click on create an image.
- Keep the default settings and create the image by clicking on Create Image.
- It’ll take a couple of minutes and once it’s done, go ahead and terminate the instance you created. You will only need the AMI ID in the next phases.
AWS Data Pipeline Architecture
The main idea behind this step is to set up a data pipeline which upon certain triggers, launches an EC2 instance. And then we will have a bash script run in that instance that would be responsible to move our raw data back and forth and run our preprocessing python script. This step can be further divided into 3 main subsections, let’s do it.
AWS Data Pipeline Architecture Definition
First of all, let’s define the AWS data pipeline architecture. We can do so by writing a JSON file that defines and describes our data pipeline and provides it with all the required logic. I’ll try to break it down as much as required but you can always refer to the documentation to explore more options. The data pipeline definition can have different pieces of information like
- Names, locations, and formats of your data sources
- Activities that transform the data
- The schedule for those activities
- Resources that run your activities and preconditions
- Preconditions that must be satisfied before the activities can be scheduled
- Ways to alert you with status updates as pipeline execution proceeds
We can express the data pipeline definition in three parts: Objects, parameters and values.
Objects
Below you can see the syntax of the definition.
{
"objects" : [
{
"name1" : "value1",
"name2" : "value2"
},
{
"name1" : "value3",
"name3" : "value4",
"name4" : "value5"
}
]
}
Following the above syntax we can place our required objects one by one. First of all, we need to define our pipeline object. We would be defining fields like ID, name, IAM and resource roles, path to save pipeline logs and schedule type. You can add or remove these fields based on your requirements and should look at the official documentation to know more about these and other fields.
{
"id": "Default",
"name": "Default",
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://automated-data-pipeline/logs/",
"scheduleType": "ONDEMAND
},
You can use this object with one change, that is the pipelineLogUri
field. You can give the path to the S3 bucket you want to save your logs in. The next object in our definition is the compute i.e. EC2 resource.
{
"id": "MyEC2Resource",
"type": "Ec2Resource",
"imageId": "ami-xxxxxxxxxxxxxxxxx",
"instanceType": "r5.large",
"spotBidPrice": "2.0",
"terminateAfter": "30 Minutes",
"actionOnTaskFailure" : "terminate",
"maximumRetries" : "1",
"role": "DataPipelineDefaultRole",
"resourceRole": "DataPipelineDefaultResourceRole",
"keyPair" : "<YOUR-KEY>"
},
We have described our compute needs in this object, for example, we need an EC2 instance of type r5.large on spot pricing with your key. Also, remember to put in the pre-configured AMI ID in the imageId
field so it launches the instance with all of the configurations set in place. Now, let’s move on to the next and last object which is the shell activity. This object would be able to run our shell script which in turn would run our preprocessing code.
{
"id": "ShellCommandActivityObj",
"name": "ShellCommandActivityObj",
"type": "ShellCommandActivity",
"command": "aws s3 cp s3://automated-data-pipeline/script.sh ~/ && sudo sh ~/script.sh #{myS3DataPath}",
"maximumRetries": "0",
"runsOn": {
"ref": "MyEC2Resource"
}
}
In this object, the two most important fields are command
and runsOn
. In the command
field you would define the bash command that you would like to run on the EC2 instance described earlier. I described a command that will copy a bash script into the EC2 instance and run it. Note that I’m also giving it a parameter #{myS3DataPath}
, it is the path we would like our pipeline to preprocess. It is given as a parameter to add flexibility to our pipeline so it can handle different data sets. The runsOn
field takes the ID of the EC2 resource we created earlier so it can run the shell command on that resource.
Parameters
Parameters place holders should be written in this format #{myPlaceholder}
. Every parameter should start with the "my" suffix. Here is the parameter section of the definition JSON file
"parameters": [
{
"id": "myS3DataPath",
"name": "mys3DataPath",
"description": "This is the path to the data uploaded",
"type": "AWS::S3::ObjectKey"
}
]
We have defined that our parameter should be AWS S3 object key type. The whole data pipeline definition can be found here.
Now, after you are done with defining your pipeline, activate it by the following command.
aws datapipeline create-pipeline --name data-preprocessing-pipeline --unique-id data-preprocessing-pipeline
Once created, you can put the definition in place. Note that we can pass a temporary parameter value at this stage, which later can be passed dynamically.
aws datapipeline put-definition --pipeline-definition file://definition.json \ --parameter-values s3DataPath=<s3://your/s3/data/path> --pipeline-id <Your Pipeline ID>
Since our data pipeline is defined and created, let’s write the bash script that will run in the compute resource of our data pipeline.
Bash Script
This script will run on the EC2 instance that the pipeline would launch as its compute resource. The working of this script is simple, it makes a new working directory, sets the current working directory and path to data in S3 bucket as environment variables, copies the data into the current working directory, runs the python script and finally uploads the cleaned data back to S3. Here is the code you will need:
#!/bin/bash
echo -e "Starting the process"
sudo mkdir ~/data-pipeline-tmp
sudo chmod ugo+rwx ~/data-pipeline-tmp
cd ~/data-pipeline-tmp
CURRENT_DIR=$(eval "pwd")
DATA_PATH=$1
export WORKING_DIR=$CURRENT_DIR
export S3_DATA_PATH=$DATA_PATH
aws s3 cp s3://automated-data-pipeline/scripts/script.py $WORKING_DIR
python3 $WORKING_DIR/script.py
aws s3 cp $WORKING_DIR/twitter_data_cleaned.csv s3://automated-data-pipeline/outputs/
In my case, the S3 bucket is named automated-data-pipeline
and I have made folders to separate different objects. This code can also be found here. Next is the python code that will preprocess the data.
Python Code
This code is the standard preprocessing code that we will use to clean our datasets. Here’s the code that you would need. Changes can be made, add or remove anything according to your needs:
import pandas as pd
import re
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
import os
path = os.environ["S3_DATA_PATH"]
print(f"\nInside Python Script\nPath = {path}\n")
print(f"Loading Data\n")
df = pd.read_csv(path, encoding="ISO-8859-1", names=["label", "id", "date", "flag", "user", "tweet"])
print(f"Data has {df.shape[0]} rows and {df.shape[1]} columns\n")
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"
stop_words = stopwords.words("english")
stemmer = SnowballStemmer("english")
def preprocess(text, stem=False):
# Remove link, user and special characters
text = re.sub(TEXT_CLEANING_RE, ' ', str(text).lower()).strip()
tokens = []
for token in text.split():
if token not in stop_words:
if stem:
tokens.append(stemmer.stem(token))
else:
tokens.append(token)
return " ".join(tokens)
print(f"Starting cleaning process")
df.text = df.text.apply(lambda x: preprocess(x))
print("Data cleaning completed, saving to CSV!\n")
df.to_csv("twitter_data_cleaned.csv", index=False)
You can also find this code here. You have successfully defined and created a working data pipeline that can work on its (with manual activation). To add the event-driven label to it, we need to write a cloud function that will act as a trigger. It will handle certain events and then activate our pipeline when required.
Event Handler AWS Lambda Function
The title says that we will be using the AWS Lambda function for this step but I like to use Chalice for this step. You can use either as per your preference, the code will almost be the same. Following are the steps to create the chalice app that runs on AWS Lambda which triggers the data pipeline. You will need the ID of the pipeline you created earlier in this step.
- Create a chalice app using chalice
new-project <NAME>
- Once the project is initialized, open
app.py
file - Copy the contents of the following snippet into it. Code also available here.
from chalice import Chalice
import boto3
app = Chalice(app_name='pipeline-trigger')
client = boto3.client("datapipeline")
# The pipeline you want to activate
PIPELINE_ID = "df-xxxxxxxxxxxxxxxxxxxx"
@app.on_s3_event(bucket='automated-data-pipeline', events=['s3:ObjectCreated:*'], prefix="preprocess/", suffix=".csv")
def activate_pipeline(event):
app.log.debug(f"Received event for bucket: {event.bucket}, key: {event.key}")
try:
response = client.activate_pipeline(
pipelineId=PIPELINE_ID,
parameterValues=[
{
"id": "myS3DataPath",
"stringValue": f"s3://{event.bucket}/{event.key}"
}
]
)
app.log.debug(response)
except Exception as e:
app.log.critical(e)
- Change the arguments like
pipeline-id
, path to s3 bucket etc - Once done, deploy the chalice app using
chalice deploy
- If deployed successfully, go to the AWS console -> Lambda
- Select your lambda function, go the Permissions tab
- Click on the name of Execution Role and it will open the IAM policy for the particular lambda function
- Under the Permissions tab, click on the policy name to expand
- Make sure that the policy has
iam:PassRole
and proper data pipeline permission - To make the life easier, following is the IAM policy that works fine
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"iam:PassRole",
"datapipeline:*"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": "arn:*:logs:*:*:*"
}
]
}
Testing
To test this pipeline, you need to upload the dataset to the S3 bucket path you specified in the trigger function. In my case the path is s3://automated-data-pipeline/preprocess/
. This allows me to use the following command in my PC terminal to simply upload the data, sit back and wait for the output into the S3 path I specified.
aws s3 cp ~/training.1600000.processed.noemoticon.csv s3://automated-data-pipeline/preprocess/
After the pipeline has run its course, it will automatically delete the resources attached to it so you don’t incur any unwanted bills. It will upload the data to your specified path, ready to be used. Now let’s observe a before and after state of the data. Following is what the data looked in its raw form:
Here is what the data looks like after going through the pipeline once:
You can clearly observe the difference, you can also find these notebooks to observe closely here.
Conclusion
I know that there are a lot of steps involved in this process but I assure you that once you have set up a pipeline like this, your life would be much easier. Still seems like a lot of work? Contact us at help@traindex.io to consult for any data engineering/science problems you might be facing.
Top comments (0)