DEV Community

Emma Moinat
Emma Moinat

Posted on

Building an OpenSearch Index from DynamoDB with CDK

Introduction

We will be looking at how to set up an OpenSearch index from a DynamoDB table. We will assume you have some knowledge of DynamoDB and Lambdas and also are familiar with using CDK for deploying infrastructure into AWS.

DynamoDB

Firstly, letโ€™s think about our DynamoDB table and how to set it up in a way that it is ready to be indexed. This is actually very straight forward and utilises a DynamoDB stream.

"A DynamoDB stream is an ordered flow of information about changes to items in a DynamoDB table." - AWS

Using CDK your table might look like:

const userTable = new dynamodb.Table(this, "UserTable", {
  tableName: "user-table",
  billingMode: BillingMode.PAY_PER_REQUEST,
  partitionKey: {name: "partitionKey", type: AttributeType.STRING},
  sortKey: {name: "sortKey", type: AttributeType.STRING},
  pointInTimeRecovery: true,
  stream: StreamViewType.NEW_IMAGE // This is the important line!
});
Enter fullscreen mode Exit fullscreen mode

If you are using the console instead of CDK see this.

There are different types of streams:

KEYS_ONLY - Only the key attributes of the modified item are written to the stream.
NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream.
OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream.
NEW_AND_OLD_IMAGES - Both new and old item images of the item are written to the stream.
Enter fullscreen mode Exit fullscreen mode

Here we have chosen NEW_IMAGE because we only need to know the new item to index.

This will create a table with a DynamoDB stream, which means any new, updated or deleted item events will be streamed into a place of your choosing; we have chosen a Lambda.

Lambda

So, next up, we must think about the indexing Lambda. There is currently no direct way to index your data from a stream to the OpenSearch domain, so we must add a middle man to do the work. More on this can be found here.

The code for this Lambda and a few other helpful Lambdas can be found here on Github. This Lambda lives in the index-stream directory.

Here is a code snippet of this Lambdaโ€™s handler:

export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
  console.log("Received event from the user table");

  for (const record of event.Records) {
    if (!record.eventName || !record.dynamodb || !record.dynamodb.Keys) continue;

    const partitionKey = record.dynamodb.Keys.partitionKey.S;
    const sortKey = record.dynamodb.Keys.sortKey.S;
    // Note here that we are using a pk and sk 
    // but maybe you are using only an id, this would look like:
    // const id = record.dynamodb.Keys.id.S;

    try {
      if (record.eventName === "REMOVE") {
        // performing a DELETE request to your index
        return await removeDocumentFromOpenSearch(partitionKey, sortKey);
      } else {
        // There are 2 types of events left to handle, INSERT and MODIFY, 
        // which will both contain a NewImage
        if (!record.dynamodb.NewImage) continue;

        const userDocument = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage) as User;
        // performing a PUT request to your index
        return await indexDocumentInOpenSearch(userDocument, partitionKey, sortKey);
      }
    } catch (error) {
      console.error("Error occurred updating OpenSearch domain", error);
      throw error;
    }
  }
};
Enter fullscreen mode Exit fullscreen mode

In CDK you can create your Lambda as follows:

const userTableIndexingFunction = new Function(this, "UserTableIndexingFunction", {
  functionName: "UserTableIndexingFunction",
  code: Code.fromAsset("user-table-indexing-lambda-dist-folder"),
  runtime: Runtime.NODEJS_16_X,
  handler: "index.handler"
});
Enter fullscreen mode Exit fullscreen mode

Then we can add the DynamoDB stream as a source event to this Lambda.

userTableIndexingFunction.addEventSource(new DynamoEventSource(userTable, {
  startingPosition: StartingPosition.TRIM_HORIZON,
  batchSize: 1, // Our lambda could handle this being more than 1 as well but of the for loop
  retryAttempts: 3
}));
Enter fullscreen mode Exit fullscreen mode

There are 2 types of starting positions:

TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system, 
               which is the oldest data record in the shard. 
               In other words, the stream will look at all the item events and 
               deal with them in chronological order (oldest event to most recent event)

      LATEST - Start reading just after the most recent record in the shard, 
               so that you always read the most recent data in the shard. 
               In other words, the stream will look at all the item events and 
               deal with the most recent first and work down until the oldest event.
Enter fullscreen mode Exit fullscreen mode

For this example, we therefore use TRIM_HORIZON so that the index will reflect the data in its current state.

OpenSearch

Now, letโ€™s look at the actual OpenSearch domain setup. Now, AWS suggests some substantial power (and therefore money) for a production ready domain. You can find the best practices here. For this example we will use a very small setup with no redundancy, however, feel free to scale this up based on your needs:

const openSearchDomain = new Domain(this, "OpenSearchDomain", {
    version: EngineVersion.OPENSEARCH_1_0,
  capacity: {
    dataNodeInstanceType: "t3.small.search",
    dataNodes: 1,
    masterNodes: 0
  },
  ebs: {
    enabled: true,
    volumeSize: 50,
    volumeType: EbsDeviceVolumeType.GENERAL_PURPOSE_SSD
  }
});
Enter fullscreen mode Exit fullscreen mode

This will deploy your OpenSearch domain, this can take some time, so be patient.

One final thing to think about is the granting your Lambda the rights to read and write to your domain. In your stack with your OpenSearch domain, add this:

openSearchDomain.grantIndexReadWrite("user-index", userTableIndexingFunction);
Enter fullscreen mode Exit fullscreen mode

This will allow your Lambda to do its job.

That's it, you are all set up and ready to index any new data into your OpenSearch index.

For indexing existing data, you can find a helpful Lambda under the index-data directory here on Github.

Top comments (1)

Collapse
 
serdalkepil profile image
Serdal Kepil

cool article ๐Ÿ‘