DEV Community

Cover image for Streaming to AWS Kinesis Data Streams using Kinesis Agent - Step by Step Tutorial
Omar Omar
Omar Omar

Posted on

Streaming to AWS Kinesis Data Streams using Kinesis Agent - Step by Step Tutorial

Introduction:

The purpose of this tutorial is to establish a solid understanding of AWS Kinesis services and how to distinguish between AWS Kinesis Data Streams and AWS Kinesis Data Firehose. If you plan to take the AWS Solutions Architect Associate exam, it's very important to understand the differences and case uses of AWS Kinesis services.

During the tutorial, we will spin up an Amazon Linux 2 instance and will install Kinesis Agent. We will configure the Kinesis Agent to send randomly generated numbers from a Python code to AWS Kinesis Data Streams. The AWS Kinesis Data Firehose will route the ingested steam to an S3 bucket for retention and further analysis.

Achieving a high level of comprehension on how to create AWS Kinesis Data Streams, AWS Kinesis Data Firehose and putting data into streams is the goal of the tutorial. The concept; therefore, can be applied to application logs, IoT sensor data, system metrics, videos, audio, analytics and more.


Architectural Diagram:

Image description


What is AWS Kinesis Data Streams?

AWS Kinesis Data Streams is an AWS service that ingests and processes data records from streams in real time. It provides accelerated data feed intake for application logs, metrics, videos, website clickstreams and more. For more information about AWS Kinesis Data Streams, please refer to AWS documentation.


What is AWS Kinesis Data Firehose:

AWS Kinesis Data Firehose is an AWS managed service that delivers real-time streaming data records to variety of destinations. These destinations could be:

Examples of AWS Kinesis Data Firehose Destinations
S3
Splunk
New Relic
Datadog
OpenSearch
LogicMonitor
MongoDB
any custom HTTP endpoint
more options...

To read more about AWS Kinesis Data Firehose, please refer to AWS documentation.


What is AWS Kinesis Agent?

It's a standalone Java based application. It collects monitored files/data/logs and send them to Kinesis Data Streams. To read more about Kinesis Agent functionalities, please refer to AWS documentation.


Tutorial Steps:

  1. Create an S3 bucket
  2. Create a Kinesis data stream
  3. Create a Kinesis data firehose
  4. Spin up an AWS EC2 instance
  5. Create an IAM role and attach it to the EC2 instance
  6. SSH into the EC2 instance
  7. Install an AWS Kinesis Agent
  8. Create a folder for the randomly generated numbers Python code
  9. Create a logfile.log to host the randomly generated numbers
  10. Configure the AWS Kinesis Agent to monitor the logfile.log
  11. Test the Python code and tail the logfile.log
  12. Run the Kinesis Agent
  13. Tail the Kinesis Agent log file
  14. Monitor Kinesis Data Streams dashboard
  15. Monitor Kinesis Data Firehose dashboard
  16. Download and open the file from the S3 bucket
  17. Do your victory dance 😉

Step 1: Create an S3 bucket

  • From the S3 console, click on Create bucket. Then, give a bucket a unique name and click on Create bucket.

NOTE: make sure your select us-east-1 for region.

Image description

Image description


2 Create a Kinesis Data Stream

  • From the AWS Kinesis Services, select Kinesis Data Streams. It should be selected by default and then click Create data stream.

Image description

  • Type in stream-1 for the name of the data stream and then leave it as default, On-demand.
  • Click Create data stream.

Image description

3 Create a Kinesis Data Firehose

  • While we are still on the AWS Kinesis Services console, let's select Delivery streams from the left hand side menu as shown below.

Image description

  • Click on Create delivery stream.
  • For the source, select Amazon Kinesis Data Streams, and for the Destination, select Amazon S3 as shown below.

Image description

  • Under Source settings, click Browse and choose the name of the Kinesis data stream, which is stream-1. Then, click Choose.

Image description

  • The Delivery stream name is generated randomly by AWS. Let's leave it as is.
  • Skip the Transformation and convert records.
  • On the Destination settings, click on Browse and then choose the S3 bucket that we created in step 1.
  • Leave Dynamic partitioning to the default selections.
  • Under Buffer hints, compression and encryption, lower the Buffer interval to 60 seconds instead of the 300 seconds default value.
  • Skip Advanced settings and click Create delivery stream.

Image description

4 Spin up an AWS Linux 2 instance

  • From the EC2 console, click on Launch Instance.

Image description

  • Step 1: Choose an Amazon Machine Image (AMI): select the first AMI on the list , Amazon Linux 2 AMI.
  • Step 2: Choose an Instance Type: keep the default selection which is t2.micro.
  • Step 3: Configure Instance Details: keep the default selections.
  • Step 4: Add Storage: keep the default selection.
  • Step 5: Add Tags: click on click to add a Name tag and type in a value, Kinesis-Agent.
  • Step 6: Configure Security Group:

    A. Security group name: Kinesis-Agent-SG

    B. Description: Opens port 22 to my local IP

    C. For the existed SSH rule, change the source to My IP.

Image description

  • Step 7: Review Instance Launch: review and click on Launch.
  • On the modal screen, select create a new key pair and name it kinesis-agent. Then, click on Download Key Pair to save the key pair to your local device. Finally, click Launch instances to spin up the EC2 instance.

Image description


5 Create an IAM role

  • From IAM console, select Policies from the side menu.
  • Click on Create policy.
  • On Create policy screen, select the JSON tab and paste the below policy into the JSON editor. Then, click Next:Tags.


{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData",
                "kinesis:PutRecords"
            ],
            "Resource": "*"
        }
    ]
}


Enter fullscreen mode Exit fullscreen mode

Image description

NOTE: The required permission for Kinesis Agent to put data into a stream is PutRecords. The PutMetricsData is needed for CloudWatch to publish metric data points, if CloudWatch monitoring is enabled for the Kinesis Agent. It's best practice to follow principle of least privilege. Please, refer to AWS documentation for more information.

  • Skip the optional Add tags screen by clicking Next Review.
  • On Review policy: A. Name: Kinesis-agent-policy B. Description: This policy allows Kinesis agent installed on an EC2 to put data points into AWS Kinesis Data Streams. It also allows CloudWatch to publish metrics for the agent. C. Click on Create policy.

Image description

  • From the IAM console, click on Roles and click Create role.
  • On the Select trusted entity screen, choose AWS service and for Use case select EC2. Then, click Next.

Image description

  • On Add permissions screen, filter policies by typing Kinesis-agent-policy and hit enter. This is the policy that we have previously created. Select the policy as shown below and click Next.

Image description

  • On Name, review and create screen:

    A. Role name: kinesis-agent-role

    B. Description: This role allows the EC2 instance that has Kinesis agent installed to call AWS Kinesis Data Streams.

    C. Scroll to the bottom and click on Create role.

Image description

  • Head to EC2 console to attach the newly created IAM role to the EC2 instance as shown on the below image.

Image description

  • From the IAM role drop down menu, select Kinesis-agent-role and click Save.

Image description


6 - 12: Install and Configure AWS Kinesis Agent

6- SSH into the EC2 instance:

  • Let's change the key pair permission. This is done for MacOS and Linux (not Windows). From within the folder, where you have saved the key:


chmod 400 Kinesis-agent.cer


Enter fullscreen mode Exit fullscreen mode
  • Now, we are ready to SSH into the EC2 instance:


ssh -i Kinesis-agent.cer ec2-user@'instance-public-ip-address'


Enter fullscreen mode Exit fullscreen mode

Image description


7- Install the AWS Kinesis Agent:

  • Before we install the Kinesis Agent, let's update the instance as best practice:


sudo yum update


Enter fullscreen mode Exit fullscreen mode
  • Now, let's install the Kinesis Agent:


sudo yum install –y aws-kinesis-agent


Enter fullscreen mode Exit fullscreen mode

Image description


8- Create a folder for the randomly generated numbers Python code:

  • Let's cd into the opt directory to create a folder which will host our Python code and then cd into the folder:


cd /opt/
sudo mkdir stream-1 
cd stream-1


Enter fullscreen mode Exit fullscreen mode

Image description

  • We will open nano text editor to create a file for our Python code:


sudo nano stream-1.py


Enter fullscreen mode Exit fullscreen mode
  • Copy the below Python code and paste it into the nano text editor:


import logging
import json
from datetime import datetime
import calendar
import random
import time

logging.basicConfig(filename='logfile.log', level=logging.DEBUG, format='%(asctime)s %(message)s')



def put_to_stream(id, value, timestamp):
    payload = {
                'random': str(value),
                'timestamp': str(timestamp),
                'id': id
              }

    response ='Putting to stream: ' + str(payload)
    logging.debug(response)


while True:
    value = random.randint(1, 100)
    timestamp = calendar.timegm(datetime.utcnow().timetuple())
    id = 'stream-1'

    put_to_stream(id, value, timestamp)

    # wait for 5 second
    time.sleep(5)


Enter fullscreen mode Exit fullscreen mode

NOTE: The Python code will generate random numbers between 1 to 100, and will output the values every 5 seconds to logfile.log. We will create a logfile.log which has to be in the root folder with the code. The code will timestamp the output values and will tag them with an id = stream-1. Once we have the Kinesis Agent running, we will verify that our architecture is configured correctly by identifying the id = stream-1 from our stream destination which is our S3 bucket. Adding this tag to the data records makes the identification and distinction process efficient, especially if we have multiple producers to the stream.

Image description

  • To save the Python code using nano:

    A. Hold down control and click x.
    B. Click, y
    C. Click, enter


9- Create a logfile.log to host our randomly generated numbers:

  • Now, we will create the logfile.log which our Python code will log to:


sudo touch logfile.log


Enter fullscreen mode Exit fullscreen mode
  • Let's confirm that we have two files in the folder by running the ls Linux command:


ls


Enter fullscreen mode Exit fullscreen mode

Image description


10- Configure the AWS Kinesis Agent to monitor the logfile.log:

  • The agent.json is the configuration file for Kinesis Agent. Let's use cat command to view the content of the file. The file resides in the following directory by default:


sudo cat /etc/aws-kinesis/agent.json


Enter fullscreen mode Exit fullscreen mode
  • The file contains default values. These values were generated during the installation process of the agent. We will only need to have the following configurations on the json file; therefore, we delete the file first and then create a fresh file:


sudo rm /etc/aws-kinesis/agent.json 
sudo nano /etc/aws-kinesis/agent.json 


Enter fullscreen mode Exit fullscreen mode
  • Now, let's copy and paste the below json into the nano text editor and save it:


{
    "cloudwatch.emitMetrics":true,
    "kinesis.endpoint":"https://kinesis.us-east-1.amazonaws.com",
    "flows":[
       {
          "filePattern":"/opt/stream-1/logfile.log",
          "kinesisStream":"stream-1"
       }
    ]
 }  


Enter fullscreen mode Exit fullscreen mode

NOTE: If you have changed the location of the Python code, changed the name of the log file or changed the name of the stream, you will have to update the above agent.json file. However, if you have followed the names indicated on the tutorial, no modifications are needed.

Image description


11- Test the Python code and tail the logfile.log.

  • Prior to running the Kinesis agent, we need to change the owner of the /opt/stream-1 folder. By doing so, we are allowing the agent to monitor the log file in the folder, the logfile.log to be specific:


sudo chown aws-kinesis-agent-user:aws-kinesis-agent-user -R /opt/stream-1


Enter fullscreen mode Exit fullscreen mode
  • Let's run ll command to ensure the ownership of the folder and the files belong to the agent.

Image description

  • Also, prior to running the agent, we will need to ensure that our Python code is logging data into the logfile.log. Let's run the Python code first. We will run the Python code in the background by running the following command;


sudo python /opt/stream-1/stream-1.py &


Enter fullscreen mode Exit fullscreen mode

NOTE: capture the 'PID' number. It's the process ID assigned to the Python code process. We need this number to stop the code from running in the background. However, you could also use the following command to find the PID for the Python process running in the background, 'sudo ps -x | grep python'.

Image description

  • For right now, don't stop the code but if you would like to stop the code from running in the background, run the following command:


sudo kill -9 'add PID here'


Enter fullscreen mode Exit fullscreen mode
  • Another useful linux command to find all running programs in the background:


ps -e


Enter fullscreen mode Exit fullscreen mode
  • Let's tail the logfile.log to see if our code is logging every 5 seconds:


sudo tail -f /opt/stream-1/logfile.log


Enter fullscreen mode Exit fullscreen mode

Note: you should see new row of data every 5 seconds as shown below. To exit out of the tail command, hold down control button and click c.

Image description

12- Run the Kinesis Agent.

  • Now, we are ready to run the Kinesis Agent. Let's run the below command to let the agent start monitoring logfile.log and transmitting data points to our stream:


sudo service aws-kinesis-agent start


Enter fullscreen mode Exit fullscreen mode
  • Let's check the status of the agent:


sudo service aws-kinesis-agent status


Enter fullscreen mode Exit fullscreen mode

NOTE: if the agent started properly, we should see similar output as shown on the below image.

Image description

  • Other important agent commands:


sudo service aws-kinesis-agent status
sudo service aws-kinesis-agent restart
sudo service aws-kinesis-agent stop


Enter fullscreen mode Exit fullscreen mode

13- Tail the Kinesis Agent log file.

  • Let's tail the Kinesis Agent log. It should give us valuable information about the current status of the agent. We will inspect for errors or any permission messages.


sudo tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log


Enter fullscreen mode Exit fullscreen mode

NOTE: we are looking for a confirmation that the agent has started sending data records to our stream as shown on the below image. Notice, the agent log stated 26 records sent successfully to destination. This is a clear indication that AWS Kinesis Data Stream is ingesting our data records.

Image description

14- Monitor Kinesis Data Streams dashboard.

  • Let's head to AWS Kinesis Services console. From our Kinesis Data Streams, click on Monitoring and navigate to Get records - sum(Count) dashboard. As shown below, the data records have been ingested by the stream successfully.

Image description

Image description

15- Monitor Kinesis Data Firehose dashboard.

  • Now, let's head to our Delivery streams (AWS Kinesis Data Firehose) and click on Monitoring and navigate to Records read from Kinesis Data Streams (Sum) dashboard as well as Delivery to Amazon S3 success dashboard. As shown below, the firehose has routed the real-time data to our S3 bucket successfully.

Image description

Image description

16- Download and open the file from the S3 bucket.

  • Currently, we have confirmed that the data records are flowing into our stream and firehose, we should find the files in our S3 bucket. Let's download one of the file and open it on our local device. We should confirm that the logged data has stream-1 id. This is the id we have defined in our randomly generated numbers Python code.

Image description

Image description

Image description

Conclusion:

If you get this far, I would like congratulate you on your achievement. We have create an AWS Kinesis Data Stream and an AWS Kinesis Data Firehose. We have also employed a Python code to randomly generate numbers and tag records with timestamp and stream name at a 5 second interval. Then, we have installed a Kinesis Agent on an EC2 and configured it to monitor our log file. We have confirmed the agent is running successfully and data records are being ingested by our stream. Lastly, we have ensured that AWS Kinesis Data Firehose has routed the data records to our S3 bucket destination by downloading and inspecting the files for the stream id.

I hope the tutorial adds greatly to your learning curve. Having accomplished a great deal of understanding about AWS Kinesis Data Streams, AWS Kinesis Data Firehose and AWS Kinesis Agent is a triumph. I wish you all the best in applying these conceptual and practical knowledge to application logs, IoT sensor data, system metrics, videos, audio, analytics and more. Now, off you go, the sky is the limit.

Top comments (1)

Collapse
 
sanjaykmenon profile image
Sanjay Krishna

Great tutorial!