Overview
This guide outlines a dynamic architecture that seamlessly integrates Amazon Kinesis Stream, Elasticsearch, and Serverless Framework for efficient data collection, processing, and analysis.
Introduction
In this guide, we'll explore the integration of Amazon Kinesis Stream and ElasticSearch using the Serverless Framework. This implementation involves creating a scheduled Node.js Lambda function to generate random data, and pushing it to a Kinesis stream. Subsequently, another Lambda function consumes this stream and stores the data in ElasticSearch.The ultimate goal is to enable querying of inserted data via Kibana over ElasticSearch.
Key Components:
- AWS Kinesis Stream: A scalable stream for real-time data ingestion and buffering.
- Elasticsearch: A powerful search and analytics engine for storing, indexing, and querying data.
- Serverless Framework: A toolkit for building and deploying serverless applications, simplifying infrastructure management.
- Lambda Functions: Serverless functions triggered by events to execute specific tasks.
Data Flow:
Data Generation:
- A time-based event (CloudWatch Event Rule) triggers a Lambda function (sender.js) every 2 minutes. ** "sender.js" generates random data and pushes it into the Kinesis Stream.**
Data Consumption and Storage:
- A second Lambda function (receiver.js) continuously consumes data from the Kinesis Stream.
- "receiver.js" extracts and processes the data, preparing it for storage.
- "receiver.js" sends the processed data to Elasticsearch for indexing and storage.
Data Analysis:
- Kibana, a visualization tool integrated with Elasticsearch, enables interactive exploration and analysis of the stored data.
Prerequisites
Before we delve into the implementation, ensure you have Serverless Framework installed on your machine:
npm install serverless -g
Step-by-Step Implementation
1. Create a Serverless Project
serverless create --template aws-nodejs --path my-service
The above command will create "my-service" directory with the following structure:
.
├── .npmignore
├── handler.js
└── serverless.yml
2. Create a Kinesis Stream
Follow the AWS documentation to create a Kinesis stream as a CloudFormation resource.
3. Schedule Data Generation
Create a CloudWatch Event Rule resource to trigger the "sender.js" Lambda function every 2 minutes.
4. Lambda Permission for Event
Create a Lambda Permission resource for events to invoke the "sender.js" Lambda function.
Check the "handler.js" code:
const { KinesisClient, PutRecordCommand } = require("@aws-sdk/client-kinesis"); // CommonJS import
const {v4: uuidv4} = require("uuid")
const faker = require("faker")
const kinesisInstance = new KinesisClient();
const kinesisName = process.env.kinesisName
module.exports.handler = async function(event) {
const response = await kinesisInstance.send(new PutRecordCommand({
Data: Buffer.from(JSON.stringify({
name: faker.name.firstName(),
jobTitle: faker.name.jobTitle()
})),
PartitionKey: uuidv4(),
StreamName: kinesisName
}));
console.log(response)
}
Check the below "serverless.yaml" file:
# The service name has to be unique to your account.
service: my-service
# framework version range supported by this service.
frameworkVersion: '2'
# Configuration of the cloud provider. As we are using AWS we defined AWS corresponding configuration.
provider:
name: aws
#runtime: nodejs14.x
#stage: dev
runtime: nodejs12.x
lambdaHashingVersion: 20201221
region: us-east-2
# Create an ENV variable to be able to use it in my JS code.
environment:
kinesisName: ${self:resources.Resources.MyKinesisStream.Properties.Name}
# To Permit each lambda function to access the DynamoDB table
iamRoleStatements:
- Effect: Allow
Action:
- kinesis:*
- es:*
Resource: "*"
custom:
CronExpression: cron(*/2 * * * ? *)
functions:
#(1) Lambda function that sends random data to kinesis stream.
SendDataTokinesis:
handler: sender.handler
#(2) Lambda function that receives random data from kinesis stream.
ReceiveDataFromkinesis:
handler: receiver.handler
events:
- stream:
type: kinesis
arn:
Fn::GetAtt:
- MyKinesisStream
- Arn
#Resources are AWS infrastructure components which your Functions use.
#The Serverless Framework deploys an AWS components your Functions depend upon.
resources:
Resources:
MyKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: ahmedsalem-KinesisStream
RetentionPeriodHours: 168
ShardCount: 3
LambdaScheduledRule:
Type: AWS::Events::Rule
Properties:
Description: "ScheduledRule"
ScheduleExpression: ${self:custom.CronExpression}
State: "ENABLED"
Targets:
-
Arn: !GetAtt 'SendDataTokinesisLambdaFunction.Arn'
Id: "TargetFunctionV1"
PermissionForEventsToInvokeLambda:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref SendDataTokinesisLambdaFunction
Action: "lambda:InvokeFunction"
Principal: "events.amazonaws.com"
SourceArn: !GetAtt LambdaScheduledRule.Arn
ElasticSearchInstance:
Type: AWS::Elasticsearch::Domain
Properties:
DomainName: 'test'
EBSOptions:
EBSEnabled: true
VolumeType: gp2
VolumeSize: 10
AccessPolicies:
Version: '2012-10-17'
Statement:
-
Effect: 'Allow'
Principal:
Service: 'lambda.amazonaws.com'
#AWS: 'arn:aws:iam::<ACCOUNT-ID>:user/<USER-ID>'
Action: 'es:*'
#Resource: 'arn:aws:es:us-east-2:944163165741:domain/test/*'
Resource: '*'
ElasticsearchClusterConfig:
InstanceType: t2.small.elasticsearch
InstanceCount: 1
DedicatedMasterEnabled: false
ZoneAwarenessEnabled: false
ElasticsearchVersion: 5.3
5. Implement sender.js
Create a Lambda function sender.js that sends random data to the Kinesis stream.
const { KinesisClient, PutRecordCommand } = require("@aws-sdk/client-kinesis"); // CommonJS import
const {v4: uuidv4} = require("uuid")
const faker = require("faker")
const kinesisInstance = new KinesisClient();
const kinesisName = process.env.kinesisName
module.exports.handler = async function(event) {
const response = await kinesisInstance.send(new PutRecordCommand({
Data: Buffer.from(JSON.stringify({
name: faker.name.firstName(),
jobTitle: faker.name.jobTitle()
})),
PartitionKey: uuidv4(),
StreamName: kinesisName
}));
console.log(response)
}
Then, install the below-required packages:
npm init
npm install @aws-sdk/client-kinesis
npm install uuid
npm install faker
6. Implement receiver.js
Create a Lambda function receiver.js that consumes data from the Kinesis stream.
const { HttpRequest} = require("@aws-sdk/protocol-http");
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
const { SignatureV4 } = require("@aws-sdk/signature-v4");
const { NodeHttpHandler } = require("@aws-sdk/node-http-handler");
const { Sha256 } = require("@aws-crypto/sha256-browser");
const {v4: uuidv4} = require("uuid")
var region = 'us-east-2';
var domain = 'search-test-mz4py3tpigmamsdhngu5gjdvaq.us-east-2.es.amazonaws.com'; // e.g. search-domain.region.es.amazonaws.com
var index = 'node-test';
var type = 'doc';
var id = '8';
module.exports.handler = async function(event) {
event.Records.forEach(async function(record) {
console.log(record.kinesis.data);
// Kinesis data is base64 encoded so decode here
var payload = Buffer.from(record.kinesis.data, 'base64').toString();
console.log('Decoded payload:', payload);
await indexDocument(payload)
});
}
//indexDocument(payload).then(() => process.exit())
async function indexDocument(document) {
// Create the HTTP request
var request = new HttpRequest({
body: document,
headers: {
'Content-Type': 'application/json',
'host': domain
},
hostname: domain,
method: 'PUT',
path: index + '/' + type + '/' + id
});
// Sign the request
var signer = new SignatureV4({
credentials: defaultProvider(),
region: region,
service: 'es',
sha256: Sha256
});
var signedRequest = await signer.sign(request);
// Send the request
var client = new NodeHttpHandler();
var { response } = await client.handle(signedRequest)
//console.log(response.statusCode + ' ' + response.body.statusMessage);
console.log(response);
var responseBody = '';
await new Promise(() => {
response.body.on('data', (chunk) => {
responseBody += chunk;
});
response.body.on('end', () => {
console.log('Response body: ' + responseBody);
});
}, (error) => {
console.log('Error: ' + error);
});
};
7. Set Up ElasticSearch
Create an ElasticSearch resource with the necessary permissions for Lambda to access it.
8. Update receiver.js
Update the receiver.js Lambda function to consume the Kinesis stream and send data to ElasticSearch. Install required package:
npm install http-aws-es
Conclusion
Following these detailed steps, you've successfully integrated Amazon Kinesis Stream and ElasticSearch using the Serverless Framework. This architecture allows you to generate and process data in real-time, storing it in ElasticSearch for easy querying and analysis using tools like Kibana.
References
Serverless Framework - DynamoDB / Kinesis Streams
Tutorial: Using AWS Lambda with Amazon Kinesis
Top comments (0)