DEV Community

Yogesh Manware
Yogesh Manware

Posted on • Updated on

Real Time Event Processing with AWS Kinesis + Lambda + ElasticSearch

I am using AWS kinesis, Lambda (NodeJs) and ElasticSearch in my Architecture. Each Kinesis Shard has per hour pricing. The requirement is to process millions of events per hour. It is an event driven architecture and expectation is to have a near real time processing.
While AWS kinesis has an excellent throughput, Elastic Search insert/update operation can be time consuming if you have considerable number of fields to be indexed.

Few Facts

High Level Architecture

Alt Design

There are N number of shards and a high capacity Elastic Search Cluster. One Kinesis Shard can trigger one lambda at any point of time.

Using concurrency and bulk update, real time processing can be achieved with optimum number of Kinesis Stream Shards.

I designed a Lambda to consume batch of 50 events at a time. The size of one event is between 5kb and 1MB. It inserts 50 records concurrently.

Note that concurrency, batch size and bulk record size needs to be tweaked based on the event size and Elastic Search Cluster's capacity/limits. You might need to review the memory requirement for Lambda based on concurrency, batch/event size.

Following code snippet is an example of concurrent processing in NodeJs using Async.

It is processing 100 events, in the group of 50 at a time.
Assuming it takes 1 seconds to insert one record in Elastic Search, It inserts all 100 records in ~3 seconds.

const async = require('async');

// can process 50 events at any point of time
const MAX_CONCURRENCY = (process.env && process.env.MAX_CONCURRENCY) || 50;

const data = new Map();

// create dummy events
for (let i = 0; i <= 100; i++) {
    data.set(`EVT00${i}`, [{a: `value ${i}`}, {a: `value 2a ${i}`}]);
}

async function process(payload) {
    const startTime = new Date();
    const functions = [];
    payload.forEach((feeds, id) => {
        console.log(`processing the feed for ${id}`);
        functions.push(processEvent.bind(null, feeds, id));
    });
    const result = await async.parallelLimit(functions, MAX_CONCURRENCY);
    console.info('processing is complete');
    const filteredResult = result.filter(element => element.data).map(element => element.data);
    console.info(JSON.stringify(filteredResult, null, 2));
    console.log(`Total time taken ${new Date() - startTime} millis`);
}

async function processEvent(arrayOfFeeds, id) {
    return new Promise((resolve, reject) => {
        // simulates //Elastic Search API call for each group of messages
        setTimeout(() => {
            console.log(`processing ${id} : ${JSON.stringify(arrayOfFeeds)}`)
            resolve({'data': {a: `aggregated value - must be an es update record ${id}`}});
        }, 1000);
    });
}

setImmediate(() => process(data));

Latest comments (0)