DEV Community

Cover image for AWS DynamoDB stream batch processing lambda using CDK
ohalay
ohalay

Posted on • Updated on

AWS DynamoDB stream batch processing lambda using CDK

Simple lambda

The most common mechanism create lambda - install AWS Toolkit, which will add AWS project templates. After that, choose the lambda template with a DynamoDB trigger, and that is all you need. But what about infrastructure?

public void FunctionHandler(DynamoDBEvent dynamoEvent)
{
  foreach (var record in dynamoEvent.Records)
  {   
    // TODO: Add business logic processing the record.Dynamodb
  }
}
Enter fullscreen mode Exit fullscreen mode

CDK

Infrastructure should live together with code because it has the same lifetime. Also, infrastructure should be easily reproducible in other environments. CDK is IaC that will solve these problems and we can write infrastructure using our lovely C# language.

public class DynamoDbLambda : Stack
{
  internal DynamoDbLambda(Construct scope, string id, IStackProps props) 
     : base(scope, id, props)
  {
    var dynamoDbTable = new Table(this, "testTable", new TableProps
    {
      BillingMode = BillingMode.PAY_PER_REQUEST,
      PartitionKey = new Attribute { Name = "Pk", Type = AttributeType.STRING },
      SortKey = new Attribute { Name = "Sk", Type = AttributeType.STRING },
    });

    var dynamoDbLambda = new Function(this, "dynamoDbLambda", new FunctionProps
    {
      Runtime = Runtime.DOTNET_6,
      MemorySize = 256,
      Handler = "Serverless.DynamoDbLambda::Serverless.DynamoDbLambda.LambdaHandler::Handle",
      Code = Code.FromAsset("Serverless.DynamoDbLambda/", new AssetOptions
      {
        Bundling = buildOption
      }),
    });

    dynamoDbLambda.AddEventSource(new DynamoEventSource(dynamoDbTable, new DynamoEventSourceProps
    {
      StartingPosition = StartingPosition.TRIM_HORIZON,
    }));
  }
}
Enter fullscreen mode Exit fullscreen mode

Taking into account, that we are using DynamoDB single table design, we will receive stream for all tables, but we want to listen to only а special table...

Filters

AWS lambda source has the possibility filter Dynamo DB stream. CDK syntax is a little bit weird, but it is just JSON representations.

Filters = new[]
{
  FilterCriteria.Filter(new Dictionary<string, object> {
    ["dynamodb"] = new Dictionary<string, object>
    {
      ["Keys"] = new Dictionary<string, object>
      {
        ["Pk"] = new Dictionary<string, object>
        {
          ["S"] = new[]{"prefix","MyTableName" }
        }
      }
    }
  })
},
Enter fullscreen mode Exit fullscreen mode

All good, only one thing... handle failed records.

Fails and retries

There's no easy way to handle failed records. Because we can have a lot of reasons for failures:

  • Bugs in lambda code
  • Unavailable services
  • Db structure changed We may retry and in case of failure send failed record to SQS.
var deadLetterQueue = new Queue(this, "deadLetterQueue");
dynamoDbLambda.AddEventSource(
  new DynamoEventSource(dynamoDbTable, new DynamoEventSourceProps
  {
    OnFailure = new SqsDlq(deadLetterQueue),
    RetryAttempts = 5,
  }));
Enter fullscreen mode Exit fullscreen mode

So far so good, but how to work with partial batch failure?

Batch processing

Instead of returning void from the lambda handler, we can report failed items and lambda will handle partial failure. Add this option to CDK ReportBatchItemFailures = true and modify the lambda itself.

public async Task<StreamsEventResponse> Handle(DynamoDBEvent events)
{
  var tasks = events.Records
   .Select(new Executor().Execute)
   .ToList();

  var failedItems = new List<BatchItemFailure>();

  try
  {
    await Task.WhenAll(tasks);
  }
  catch
  {
    failedItems = tasks
      .Select((task, index) => new { task, index })
      .Where(x => !x.task.IsCompletedSuccessfully)
      .Select(x => new BatchItemFailure 
      {
        ItemIdentifier = events.Records[x.index].EventID 
      })
      .ToList();
  }

  return new StreamsEventResponse {BatchItemFailures = failedItems};
}
Enter fullscreen mode Exit fullscreen mode

And finally, the source code in the GitHub repository add-dynamo-db-lambda branch

Build&Test GitHub repo size GitHub contributors GitHub stars GitHub forks

Serverless integration tests

A public feed with available products that updates every day

Business problem

  • Integration tests for serverless solution

Requirements

  • Docker

Implementation

  1. S3 public buckets with available documents
  2. Lambda updates document

Top comments (0)