DEV Community

Leveraging Kinesis Stream and ElasticSearch with Serverless Framework

Overview
This guide outlines a dynamic architecture that seamlessly integrates Amazon Kinesis Stream, Elasticsearch, and Serverless Framework for efficient data collection, processing, and analysis.

Introduction
In this guide, we'll explore the integration of Amazon Kinesis Stream and ElasticSearch using the Serverless Framework. This implementation involves creating a scheduled Node.js Lambda function to generate random data, and pushing it to a Kinesis stream. Subsequently, another Lambda function consumes this stream and stores the data in ElasticSearch.The ultimate goal is to enable querying of inserted data via Kibana over ElasticSearch.

Key Components:

- AWS Kinesis Stream: A scalable stream for real-time data ingestion and buffering.

- Elasticsearch: A powerful search and analytics engine for storing, indexing, and querying data.

- Serverless Framework: A toolkit for building and deploying serverless applications, simplifying infrastructure management.

- Lambda Functions: Serverless functions triggered by events to execute specific tasks.

Architecture
Image description

Data Flow:

Data Generation:

  • A time-based event (CloudWatch Event Rule) triggers a Lambda function (sender.js) every 2 minutes. ** "sender.js" generates random data and pushes it into the Kinesis Stream.**

Data Consumption and Storage:

  • A second Lambda function (receiver.js) continuously consumes data from the Kinesis Stream.
  • "receiver.js" extracts and processes the data, preparing it for storage.
  • "receiver.js" sends the processed data to Elasticsearch for indexing and storage.

Data Analysis:

  • Kibana, a visualization tool integrated with Elasticsearch, enables interactive exploration and analysis of the stored data.

Prerequisites

Before we delve into the implementation, ensure you have Serverless Framework installed on your machine:

npm install serverless -g
Enter fullscreen mode Exit fullscreen mode

Step-by-Step Implementation

1. Create a Serverless Project

serverless create --template aws-nodejs --path my-service
Enter fullscreen mode Exit fullscreen mode

The above command will create "my-service" directory with the following structure:
.
├── .npmignore
├── handler.js
└── serverless.yml

2. Create a Kinesis Stream
Follow the AWS documentation to create a Kinesis stream as a CloudFormation resource.

3. Schedule Data Generation
Create a CloudWatch Event Rule resource to trigger the "sender.js" Lambda function every 2 minutes.

4. Lambda Permission for Event
Create a Lambda Permission resource for events to invoke the "sender.js" Lambda function.

Check the "handler.js" code:

const { KinesisClient, PutRecordCommand } = require("@aws-sdk/client-kinesis"); // CommonJS import
const {v4: uuidv4} = require("uuid")
const faker = require("faker")

const kinesisInstance = new KinesisClient();

const kinesisName = process.env.kinesisName

module.exports.handler = async function(event) {
  const response = await kinesisInstance.send(new PutRecordCommand({
    Data: Buffer.from(JSON.stringify({
      name: faker.name.firstName(),
      jobTitle: faker.name.jobTitle()  
    })),
    PartitionKey: uuidv4(),
    StreamName: kinesisName
  }));
  console.log(response)
}
Enter fullscreen mode Exit fullscreen mode

Check the below "serverless.yaml" file:

# The service name has to be unique to your account.
service: my-service

# framework version range supported by this service.
frameworkVersion: '2'

# Configuration of the cloud provider. As we are using AWS we defined AWS corresponding configuration.
provider:
  name: aws
  #runtime: nodejs14.x
  #stage: dev
  runtime: nodejs12.x
  lambdaHashingVersion: 20201221
  region: us-east-2

  # Create an ENV variable to be able to use it in my JS code. 
  environment:
    kinesisName: ${self:resources.Resources.MyKinesisStream.Properties.Name}

  # To Permit each lambda function to access the DynamoDB table
  iamRoleStatements:
    - Effect: Allow
      Action:
        - kinesis:*
        - es:*
      Resource: "*"


custom:
  CronExpression: cron(*/2 * * * ? *)  


functions:
  #(1) Lambda function that sends random data to kinesis stream.
  SendDataTokinesis:
    handler: sender.handler

  #(2) Lambda function that receives random data from kinesis stream.
  ReceiveDataFromkinesis:
    handler: receiver.handler
    events:
      - stream:
          type: kinesis
          arn:
            Fn::GetAtt:
              - MyKinesisStream
              - Arn

#Resources are AWS infrastructure components which your Functions use. 
#The Serverless Framework deploys an AWS components your Functions depend upon.
resources:
  Resources:
    MyKinesisStream: 
      Type: AWS::Kinesis::Stream 
      Properties: 
          Name: ahmedsalem-KinesisStream
          RetentionPeriodHours: 168 
          ShardCount: 3

    LambdaScheduledRule: 
      Type: AWS::Events::Rule
      Properties: 
        Description: "ScheduledRule"
        ScheduleExpression: ${self:custom.CronExpression}
        State: "ENABLED"
        Targets: 
          - 
            Arn: !GetAtt 'SendDataTokinesisLambdaFunction.Arn'
            Id: "TargetFunctionV1"

    PermissionForEventsToInvokeLambda:
      Type: AWS::Lambda::Permission
      Properties:
        FunctionName: !Ref SendDataTokinesisLambdaFunction
        Action: "lambda:InvokeFunction"
        Principal: "events.amazonaws.com"
        SourceArn: !GetAtt LambdaScheduledRule.Arn

    ElasticSearchInstance:
      Type: AWS::Elasticsearch::Domain
      Properties:
        DomainName: 'test'
        EBSOptions:
          EBSEnabled: true
          VolumeType: gp2
          VolumeSize: 10
        AccessPolicies:
          Version: '2012-10-17'
          Statement:
            -
              Effect: 'Allow'
              Principal:
                Service: 'lambda.amazonaws.com'
                #AWS: 'arn:aws:iam::<ACCOUNT-ID>:user/<USER-ID>'
              Action: 'es:*'
              #Resource: 'arn:aws:es:us-east-2:944163165741:domain/test/*'
              Resource: '*'
        ElasticsearchClusterConfig:
          InstanceType: t2.small.elasticsearch
          InstanceCount: 1
          DedicatedMasterEnabled: false
          ZoneAwarenessEnabled: false
        ElasticsearchVersion: 5.3


Enter fullscreen mode Exit fullscreen mode

5. Implement sender.js
Create a Lambda function sender.js that sends random data to the Kinesis stream.

const { KinesisClient, PutRecordCommand } = require("@aws-sdk/client-kinesis"); // CommonJS import
const {v4: uuidv4} = require("uuid")
const faker = require("faker")

const kinesisInstance = new KinesisClient();

const kinesisName = process.env.kinesisName

module.exports.handler = async function(event) {
  const response = await kinesisInstance.send(new PutRecordCommand({
    Data: Buffer.from(JSON.stringify({
      name: faker.name.firstName(),
      jobTitle: faker.name.jobTitle()  
    })),
    PartitionKey: uuidv4(),
    StreamName: kinesisName
  }));
  console.log(response)
}
Enter fullscreen mode Exit fullscreen mode

Then, install the below-required packages:

npm init
npm install @aws-sdk/client-kinesis
npm install uuid
npm install faker
Enter fullscreen mode Exit fullscreen mode

6. Implement receiver.js
Create a Lambda function receiver.js that consumes data from the Kinesis stream.

const { HttpRequest} = require("@aws-sdk/protocol-http");
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
const { SignatureV4 } = require("@aws-sdk/signature-v4");
const { NodeHttpHandler } = require("@aws-sdk/node-http-handler");
const { Sha256 } = require("@aws-crypto/sha256-browser");
const {v4: uuidv4} = require("uuid")


var region = 'us-east-2';
var domain = 'search-test-mz4py3tpigmamsdhngu5gjdvaq.us-east-2.es.amazonaws.com'; // e.g. search-domain.region.es.amazonaws.com
var index = 'node-test';
var type = 'doc';
var id = '8';


module.exports.handler = async function(event) {
    event.Records.forEach(async function(record) {
        console.log(record.kinesis.data);
        // Kinesis data is base64 encoded so decode here
        var payload = Buffer.from(record.kinesis.data, 'base64').toString();
        console.log('Decoded payload:', payload); 
        await indexDocument(payload)

    });
}


//indexDocument(payload).then(() => process.exit())

async function indexDocument(document) {

    // Create the HTTP request
    var request = new HttpRequest({
        body: document,
        headers: {
            'Content-Type': 'application/json',
            'host': domain
        },
        hostname: domain,
        method: 'PUT',
        path: index + '/' + type + '/' + id
    });

    // Sign the request
    var signer = new SignatureV4({
        credentials: defaultProvider(),
        region: region,
        service: 'es',
        sha256: Sha256
    });

    var signedRequest = await signer.sign(request);

    // Send the request
    var client = new NodeHttpHandler();
    var { response } =  await client.handle(signedRequest)
    //console.log(response.statusCode + ' ' + response.body.statusMessage);
    console.log(response);

    var responseBody = '';
    await new Promise(() => {
      response.body.on('data', (chunk) => {
        responseBody += chunk;
      });
      response.body.on('end', () => {
        console.log('Response body: ' + responseBody);
      });
    }, (error) => {
        console.log('Error: ' + error);
    });
};
Enter fullscreen mode Exit fullscreen mode

7. Set Up ElasticSearch
Create an ElasticSearch resource with the necessary permissions for Lambda to access it.

8. Update receiver.js
Update the receiver.js Lambda function to consume the Kinesis stream and send data to ElasticSearch. Install required package:

npm install http-aws-es
Enter fullscreen mode Exit fullscreen mode

Conclusion
Following these detailed steps, you've successfully integrated Amazon Kinesis Stream and ElasticSearch using the Serverless Framework. This architecture allows you to generate and process data in real-time, storing it in ElasticSearch for easy querying and analysis using tools like Kibana.

References
Serverless Framework - DynamoDB / Kinesis Streams

Tutorial: Using AWS Lambda with Amazon Kinesis

What is Amazon OpenSearch Service?

KinesisClient

AWS::Elasticsearch::Domain

Top comments (0)