DEV Community

Cover image for Use Golang for data processing with Amazon Kinesis and AWS Lambda
Abhishek Gupta for AWS

Posted on • Updated on • Originally published at abhishek1987.Medium

Use Golang for data processing with Amazon Kinesis and AWS Lambda

This blog post is for folks interested in learning how to use Golang and AWS Lambda to build a serverless solution. You will be using the aws-lambda-go library along with the AWS Go SDK v2 for an application that will process records from an Amazon Kinesis data stream and store them in a DynamoDB table. But that's not all! You will also use Go bindings for AWS CDK to implement "Infrastructure-as-code" for the entire solution and deploy it with the AWS CDK CLI.

What's covered?

Introduction

Amazon Kinesis is a platform for real-time data processing, ingestion, and analysis. Kinesis Data Streams is a serverless streaming data service (part of the Kinesis streaming data platform, along with Kinesis Data Firehose, Kinesis Video Streams, and Kinesis Data Analytics) that enables developers to collect, process, and analyze large amounts of data in real-time from various sources such as social media, IoT devices, logs, and more. AWS Lambda, on the other hand, is a serverless compute service that allows developers to run their code without having to manage the underlying infrastructure.

The integration of Amazon Kinesis with AWS Lambda provides an efficient way to process and analyze large data streams in real-time. A Kinesis data stream is a set of shards and each shard contains a sequence of data records. A Lambda function can act as a consumer application and process data from a Kinesis data stream. You can map a Lambda function to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out. For standard iterators, Lambda polls each shard in your Kinesis stream for records using HTTP protocol. The event source mapping shares read throughput with other consumers of the shard.

Amazon Kinesis and AWS Lambda can be used together to build many solutions including real-time analytics (allowing businesses to make informed decisions), log processing (use logs to proactively identify and address issues in server/applications etc. before they become critical), IoT data processing (analyze device data in real-time and trigger actions based on the results), clickstream analysis (provide insights into user behavior), fraud detection (detect and prevent fraudulent card transactions) and more.

As always, the code is available on GitHub

Image description

Pre-requisites

Before you proceed, make sure you have the Go programming language (v1.18 or higher) and AWS CDK installed.

Clone the GitHub repository and change to the right directory:

git clone https://github.com/abhirockzz/kinesis-lambda-events-golang

cd kinesis-lambda-events-golang
Enter fullscreen mode Exit fullscreen mode

Use AWS CDK to deploy the solution

To start the deployment, simply invoke cdk deploy and wait for a bit. You will see a list of resources that will be created and will need to provide your confirmation to proceed.

cd cdk

cdk deploy

# output

Bundling asset KinesisLambdaGolangStack/kinesis-function/Code/Stage...

✨  Synthesis time: 5.94s

This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
Please confirm you intend to make the following modifications:

//.... omitted

Do you wish to deploy these changes (y/n)? y
Enter fullscreen mode Exit fullscreen mode

This will start creating the AWS resources required for our application.

If you want to see the AWS CloudFormation template which will be used behind the scenes, run cdk synth and check the cdk.out folder

You can keep track of the progress in the terminal or navigate to AWS console: CloudFormation > Stacks > KinesisLambdaGolangStack

Once all the resources are created, you can try out the application. You should have:

  • A Lambda function
  • A Kinesis stream
  • A DynamoDB table
  • along with a few other components (like IAM roles etc.)

Verify the solution

You can check the table and Kinesis stream info in the stack output (in the terminal or the Outputs tab in the AWS CloudFormation console for your Stack):

Image description

Publish few messages to the Kinesis stream. For the purposes of this demo, you can use the AWS CLI:

export KINESIS_STREAM=<enter the Kinesis stream name from cloudformation output>

aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"seattle"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new york"}' | base64)
Enter fullscreen mode Exit fullscreen mode

Check the DynamoDB table to confirm that the file metadata has been stored. You can use the AWS console or the AWS CLI aws dynamodb scan --table-name <enter the table name from cloudformation output>

Image description

Don't forget to clean up

Once you're done, to delete all the services, simply use:

cdk destroy

#output prompt (choose 'y' to continue)

Are you sure you want to delete: KinesisLambdaGolangStack (y/n)?
Enter fullscreen mode Exit fullscreen mode

You were able to setup and try the complete solution. Before we wrap up, let's quickly walk through some of important parts of the code to get a better understanding of what's going the behind the scenes.

Code walk through

Some of the code (error handling, logging etc.) has been omitted for brevity since we only want to focus on the important parts.

AWS CDK

You can refer to the CDK code here

We start by creating the DynamoDB table:

    table := awsdynamodb.NewTable(stack, jsii.String("dynamodb-table"),
        &awsdynamodb.TableProps{
            PartitionKey: &awsdynamodb.Attribute{
                Name: jsii.String("email"),
                Type: awsdynamodb.AttributeType_STRING},
        })

    table.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
Enter fullscreen mode Exit fullscreen mode

We create the Lambda function (CDK will take care of building and deploying the function) and make sure we provide it appropriate permissions to write to the DynamoDB table.

    function := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("kinesis-function"),
        &awscdklambdagoalpha.GoFunctionProps{
            Runtime:     awslambda.Runtime_GO_1_X(),
            Environment: &map[string]*string{"TABLE_NAME": table.TableName()},
            Entry:       jsii.String(functionDir),
        })

    table.GrantWriteData(function)
Enter fullscreen mode Exit fullscreen mode

Then, we create the Kinesis stream and add that as an event source to the Lambda function.

    kinesisStream := awskinesis.NewStream(stack, jsii.String("lambda-test-stream"), nil)

    function.AddEventSource(awslambdaeventsources.NewKinesisEventSource(kinesisStream, &awslambdaeventsources.KinesisEventSourceProps{
        StartingPosition: awslambda.StartingPosition_LATEST,
    }))
Enter fullscreen mode Exit fullscreen mode

Finally, we export the Kinesis stream and DynamoDB table name as CloudFormation outputs.

    awscdk.NewCfnOutput(stack, jsii.String("kinesis-stream-name"),
        &awscdk.CfnOutputProps{
            ExportName: jsii.String("kinesis-stream-name"),
            Value:      kinesisStream.StreamName()})

    awscdk.NewCfnOutput(stack, jsii.String("dynamodb-table-name"),
        &awscdk.CfnOutputProps{
            ExportName: jsii.String("dynamodb-table-name"),
            Value:      table.TableName()})
Enter fullscreen mode Exit fullscreen mode

Lambda function

You can refer to the Lambda Function code here

The Lambda function handler iterates over each record in the Kinesis stream, and for each of them:

  • Unmarshals the JSON payload in the Kinese stream into a Go struct
  • Stores the stream data partition key as the primary key attribute (email) of the DynamoDB table
  • Rest of the information is picked up from the stream data and also stored in the table.
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {

    for _, record := range kinesisEvent.Records {

        data := record.Kinesis.Data

        var user CreateUserInfo
        err := json.Unmarshal(data, &user)

        item, err := attributevalue.MarshalMap(user)
        if err != nil {
            return err
        }

        item["email"] = &types.AttributeValueMemberS{Value: record.Kinesis.PartitionKey}

        _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
            TableName: aws.String(table),
            Item:      item,
        })
    }

    return nil
}

type CreateUserInfo struct {
    Name string `json:"name"`
    City string `json:"city"`
}
Enter fullscreen mode Exit fullscreen mode

Wrap up

In this blog, you saw an example of how to use Lambda to process messages in a Kinesis stream and store them in DynamoDB, thanks to the Kinesis and Lamdba integration. The entire infrastructure life-cycle was automated using AWS CDK.

All this was done using the Go programming language, which is well supported in DynamoDB, AWS Lambda and AWS CDK.

Happy building!

Top comments (0)