DEV Community

Miljan Čabrilo
Miljan Čabrilo

Posted on

Syncing Data from DocumentDB to OpenSearch using Change Streams

Change streams are a feature of Amazon DocumentDB that provides a time-ordered sequence of data change events that occur within a DocumentDB cluster. Change streams can be enabled for an individual collection and can be configured to provide the complete document rather than only the change that occurred. Change streams can be integrated natively with a Lambda function, which gives us wide array of possibilities.

In this tutorial, we will demonstrate step by step how to synchronize real-time data changes from a DocumentDB cluster to an OpenSearch domain using change streams and a Lambda function.

Final infrastructure
At the end of the tutorial, we will have an infrastructure as shown in the image above. We will create a VPC, DocumentDB cluster, OpenSearch domain, API gateway, and four Lambda functions. Three functions will be exposed via the API gateway: one for writing data, one for reading data, and one for configuring the DocumentDB collection. The fourth function, which is the most important one, will be connected to the change stream and perform data synchronization. Both the functions and the infrastructure will be written in TypeScript and deployed using CDK. The repository containing the entire code can be found here. Let's get started!

VPC setup

We create a VPC using CDK's construct. This one-liner creates a VPC with a private and a public subnet and sets up network routing.

this.vpc = new Vpc(this, 'Vpc', { vpcName: 'change-streams-demo-vpc' });
Enter fullscreen mode Exit fullscreen mode

Next, we create three security groups: one for Lambda functions, one for the DocumentDB cluster, and one for the OpenSearch domain. As the Lambda functions will perform CRUD operations on data stored in DocumentDB and OpenSearch, we add ingress rules to the DocumentDB and OpenSearch security groups, authorizing access from the Lambda security group. Additionally, we include a self-referencing ingress rule in the DocumentDB security group, which will be explained later on.

this.lambdaSecurityGroup = new SecurityGroup(this, 'LambdaSecurityGroup', {
    vpc: this.vpc
});

this.openSearchSecurityGroup = new SecurityGroup(this, 'OpenSearchSecurityGroup', {
    vpc: this.vpc
});

this.documentDbSecurityGroup = new SecurityGroup(this, 'DocumentDbSecurityGroup', {
    vpc: this.vpc
});

this.openSearchSecurityGroup.addIngressRule(this.lambdaSecurityGroup, Port.tcp(443), 'Access from Lambda functions');

this.documentDbSecurityGroup.addIngressRule(this.lambdaSecurityGroup, Port.tcp(27017), 'Access from Lambda functions');

this.documentDbSecurityGroup.addIngressRule(
    this.documentDbSecurityGroup,
    Port.tcp(27017),
    'Self referencing inbound rule - access from Event Source Mapping'
);
Enter fullscreen mode Exit fullscreen mode

DocumentDB setup

We create a DocumentDB cluster using CDK's DatabaseCluster construct. The engineVersion is set to 4.0.0 since this is the only version of DocumentDB that supports change streams. The DatabaseCluster creates a master user secret for us and stores it in Secrets Manager under a name defined in masterUser.secretName. We set the vpc and securityGroup properties to the previously created VPC and DocumentDB security group. To launch the cluster in a private subnet, we set vpcSubnets.subnetType to SubnetType.PRIVATE_WITH_EGRESS. The DatabaseCluster will automatically select private subnets that have only outbound internet access. We also set the removalPolicy to RemovalPolicy.DESTROY to ensure the cluster is deleted when the stack is deleted, avoiding any unexpected costs.

this.documentDbCluster = new DatabaseCluster(this, 'DocumentDbCluster', {
    dbClusterName: 'change-streams-demo-cluster',
    engineVersion: '4.0.0',
    masterUser: {
        username: 'admin_user',
        secretName: 'documentdb/admin_user'
    },
    instanceType: InstanceType.of(InstanceClass.T3, InstanceSize.MEDIUM),
    vpcSubnets: {
        subnetType: SubnetType.PRIVATE_WITH_EGRESS
    },
    vpc: this.props.vpc,
    removalPolicy: RemovalPolicy.DESTROY,
    securityGroup: this.props.documentDbSecurityGroup
});
Enter fullscreen mode Exit fullscreen mode

OpenSearch setup

To set up the OpenSearch domain, we utilize CDK's Domain construct. The properties vpc, securityGroups, and removalPolicy are set in the same manner as for the DocumentDB cluster. For the vpcSubnets property, we cannot use automatic subnet selection as we did in the DocumentDB setup. Instead, it is necessary to explicitly define exactly one private subnet since we only have one OpenSearch node.

For the simplicity of this tutorial, we rely on IAM to authorize access to the OpenSearch domain. The Domain construct does not create a resource-based IAM policy on the domain, known as the domain access policy. This allows us to authorize access using identity-based policies, such as an IAM role for the Lambda function, without conflicting with the domain access policy. If you wish to explore OpenSearch security in more detail, check out the official documentation available here.

this.openSearchDomain = new Domain(this, 'OpenSearchDomain', {
    domainName: 'change-streams-demo-domain',
    version: EngineVersion.OPENSEARCH_2_3,
    vpc: this.props.vpc,
    vpcSubnets: [{ subnets: [this.props.vpc.privateSubnets[0]] }],
    securityGroups: [this.props.openSearchSecurityGroup],
    capacity: {
        dataNodes: 1,
        dataNodeInstanceType: 't2.small.search'
    },
    removalPolicy: RemovalPolicy.DESTROY
});
Enter fullscreen mode Exit fullscreen mode

Lambda functions setup

Before we create the Lambda functions, we need to create an API Gateway that will be used to invoke the functions. Similar to other resources, we create the API Gateway using the RestApi construct. We also attach two resources, demo-data and config, to the API Gateway. Later on, we will attach a POST method to the demo-data resource for writing data to DocumentDB, as well as a GET method for reading data from OpenSearch. Additionally, we will attach a POST method to the config resource, which will be used to configure change streams on the DocumentDB collection.

const apiGateway = new RestApi(this, 'ApiGateway', {
    restApiName: 'change-streams-demo-api-gateway'
});

this.demoResource = apiGateway.root.addResource('demo-data');
this.configResource = apiGateway.root.addResource('config');
Enter fullscreen mode Exit fullscreen mode

Writing data to DocumentDB cluster

To be able to write to the DocumentDB cluster, the writer Lambda function requires access to the cluster's master secret. So, we create an IAM role for our writer function that contains all the necessary permissions. In the inlinePolicies property, we add a new policy that grants access to the cluster's secret through the secretsmanager:GetSecretValue action. We also include the managed policy AWSLambdaVPCAccessExecutionRole, which provides all the permissions required for running a Lambda function in a VPC and writing logs to CloudWatch.

const documentDbAccessLambdaRole = new Role(this, 'DocumentDbAccessLambdaRole', {
    roleName: 'DocumentDbAccessLambdaRole',
    assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
    managedPolicies: [ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaVPCAccessExecutionRole')],
    inlinePolicies: {
        documentDbAccessPolicy: new PolicyDocument({
            statements: [
                new PolicyStatement({
                    sid: 'DocumentDbSecretAccess',
                    resources: [this.props.documentDbSecretArn],
                    actions: ['secretsmanager:GetSecretValue']
                })
            ]
        })
    }
});
Enter fullscreen mode Exit fullscreen mode

To create the Lambda function, we utilize the NodejsFunction construct. This construct simplifies the process of creating Lambda functions by automatically transpiling and bundling TypeScript or JavaScript code. Under the hood, it utilizes esbuild.

We assign the previously created VPC and security group to the Lambda function using the vpc and securityGroups properties. We configure two environment variables: DOCUMENT_DB_SECRET and DOCUMENT_DB_ENDPOINT. These variables store the ARN of the cluster's master secret and the endpoint of the cluster, respectively. The Lambda function will utilize these values to establish a connection with the DocumentDB cluster.

By default, the DocumentDB cluster uses TLS (Transport Layer Security). To establish a connection with the cluster, we need to verify its certificate using the AWS-provided Certificate Authority (CA) certificate. The file global-bundle.pem contains the AWS CA certificate. To make it available to the Lambda function during runtime, we use the afterBundling command hook, which copies global-bundle.pem to the Lambda deployment package.

Finally, we attach the Lambda function to the API Gateway as a POST method of the demo-data resource.

const writerFunction = new NodejsFunction(this, 'WriterLambda', {
    functionName: 'change-streams-demo-writer',
    entry: path.join(__dirname, '..', 'functions', 'writer.ts'),
    vpc: this.props.vpc,
    securityGroups: [this.props.lambdSecurityGroup],
    environment: {
        DOCUMENT_DB_SECRET: this.props.documentDbSecretArn,
        DOCUMENT_DB_ENDPOINT: this.props.documentDbEndpoint
    },
    role: roles.documentDbAccessLambdaRole,
    timeout: Duration.seconds(15),
    bundling: {
        commandHooks: {
            afterBundling: (inputDir: string, outputDir: string): string[] => [`cp ${inputDir}/certificate/global-bundle.pem ${outputDir}`],
            beforeBundling: (inputDir: string, outputDir: string): string[] => [],
            beforeInstall: (inputDir: string, outputDir: string): string[] => []
        }
    }
});
this.demoResource.addMethod('POST', new LambdaIntegration(writerFunction, { proxy: true }));
Enter fullscreen mode Exit fullscreen mode

To connect to the DocumentDB cluster, we utilize the mongodb package. Within the createMongoClient() function, we first retrieve the master secret from Secrets Manager. Then we use this secret, along with the previously bundled CA certificate, to establish a connection with the cluster.

export async function createMongoClient(): Promise<MongoClient> {
    const secretsManager = new AWS.SecretsManager();

    const secret = await secretsManager.getSecretValue({ SecretId: process.env.DOCUMENT_DB_SECRET as string }).promise();
    const credentials = JSON.parse(secret.SecretString as string);

    const client = new MongoClient(`mongodb://${credentials.username}:${encodeURIComponent(credentials.password)}@${process.env.DOCUMENT_DB_ENDPOINT}`, {
        tls: true,
        tlsCAFile: 'global-bundle.pem',
        retryWrites: false
    });
    return client.connect();
}
Enter fullscreen mode Exit fullscreen mode

In the handler function, we simply instantiate a MongoClient instance and write the requests' body to the demo-collection.

export async function handler(event: any) {
    console.log('Received event from API gateway: ', JSON.stringify(event));
    const client = await createMongoClient();

    const demoCollection = client.db('demo-db').collection('demo-collection');
    await demoCollection.insertOne(JSON.parse(event.body));

    await client.close();
    return { statusCode: 200 };
}
Enter fullscreen mode Exit fullscreen mode

Enabling change streams

To utilize change streams, we need to enable them either for the entire DocumentDB database or for the specified collection. Since our DocumentDB cluster is deployed in a private subnet of the VPC, direct access to it is not possible. To overcome this limitation, we create a Lambda function responsible for configuring change streams on the demo collection. This Lambda function is deployed within the VPC and exposed through API Gateway, enabling invocation from outside the VPC.

In a real-world scenario, these configuration tasks would typically be performed either through a script during deployment, such as a CodeBuild job, or manually on the cluster if direct access is available (e.g., via a bastion host or VPN connection). For the purpose of this demo, setting up a Lambda function proves to be the simplest solution.

The setup for the configuration Lambda function follows the same steps as the writer function, so we can skip directly to the handler code. In the code, we create the demo-collection collection and execute an admin command to enable change streams on it.

export async function handler(event: any) {
    const client = await createMongoClient();
    const db = client.db('demo-db');

    await db.createCollection('demo-collection');
    await db.admin().command({
        modifyChangeStreams: 1,
        database: 'demo-db',
        collection: 'demo-collection',
        enable: true
    });

    client.close();
    return { statusCode: 200 };
}
Enter fullscreen mode Exit fullscreen mode

Event Source Mapping setup

An Event Source Mapping (ESM) is a Lambda resource that reads from an event source and triggers a Lambda function. In this case, we use an ESM to read change streams from the DocumentDB cluster and invoke the sync Lambda function. The ESM will handle the connection to the DocumentDB cluster, read the change stream events, group them into batches, and invoke the sync function. In the sync function, we will simply write the entire document to the OpenSearch domain.

To perform its tasks successfully, ESM requires the appropriate permissions both at the networking level and the IAM level. The ESM will "inherit" the security group of the DocumentDB cluster and utilize it when establishing a connection to the cluster. This is precisely why we included a self-referencing inbound rule in the security group of the DocumentDB cluster during the VPC setup. This rule allows the ESM to access the cluster successfully.

An ESM relies on the permissions granted by the function's execution role to read and manage items within the event source. Therefore, in the IAM role of the sync function, we include three statements (ESMNetworkingAccess, ESMDocumentDbAccess, ESMDocumentDbSecretAccess) that grant the necessary permissions required by the ESM. The ESMNetworkingAccess statement provides networking permissions, the ESMDocumentDbAccess statement grants DocumentDB management permissions, and the ESMDocumentDbSecretAccess statement allows the ESM to read the master secret of the cluster. We also include an OpenSearchAccess statement, which is utilized by the sync Lambda function itself. The actions es:ESHttpPost, es:ESHttpPut, and es:ESHttpGet within this statement grant the ability to read and write data to the domain or index defined in the resources field.

const syncLambdaRole = new Role(this, 'SyncLambdaRole', {
    roleName: 'SyncLambdaRole',
    assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
    managedPolicies: [ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaVPCAccessExecutionRole')],
    inlinePolicies: {
        syncLambdaPolicy: new PolicyDocument({
            statements: [
                new PolicyStatement({
                    sid: 'ESMNetworkingAccess',
                    resources: ['*'],
                    actions: [
                        'ec2:CreateNetworkInterface',
                        'ec2:DescribeNetworkInterfaces',
                        'ec2:DescribeVpcs',
                        'ec2:DeleteNetworkInterface',
                        'ec2:DescribeSubnets',
                        'ec2:DescribeSecurityGroups',
                        'kms:Decrypt'
                    ]
                }),
                new PolicyStatement({
                    sid: 'ESMDocumentDbAccess',
                    resources: ['*'],
                    actions: ['rds:DescribeDBClusters', 'rds:DescribeDBClusterParameters', 'rds:DescribeDBSubnetGroups']
                }),
                new PolicyStatement({
                    sid: 'ESMDocumentDbSecretAccess',
                    resources: [this.props.documentDbSecretArn],
                    actions: ['secretsmanager:GetSecretValue']
                }),
                new PolicyStatement({
                    sid: 'OpenSearchAccess',
                    resources: [`${this.props.openSearchDomainArn}/*`],
                    actions: ['es:ESHttpPost', 'es:ESHttpPut', 'es:ESHttpGet']
                })
            ]
        })
    }
});
Enter fullscreen mode Exit fullscreen mode

The sync function is defined in the same way as the writer and config functions, using the NodejsFunction construct. So, we can continue to the ESM definition. In the ESM definition, we specify the sync function in the functionName property, the DocumentDB cluster in the eventSourceArn property, and the cluster's master secret in the sourceAccessConfigurations property.

Within the documentDbEventSourceConfig, we define the database and collection from which we want to read change streams. By specifying the value UpdateLookup in the fullDocument property, we indicate that we want to receive the entire document in the change stream event, rather than just the delta of the change.

We initially set the enabled property to false for the ESM. We will enable ESM later on, once we have set up change streams on the demo collection by invoking the config endpoint. If we were to enable ESM immediately, since it is created before invoking the config method, it would detect that change streams are not enabled, and we would need to restart it.

const esm = new CfnEventSourceMapping(this, 'DocumentDbEventSourceMapping', {
    functionName: this.syncFunction.functionName,
    eventSourceArn: `arn:aws:rds:${Stack.of(this).region}:${Stack.of(this).account}:cluster:${this.props.documentDbClusterIdentifier}`,
    sourceAccessConfigurations: [
        {
            type: 'BASIC_AUTH',
            uri: this.props.documentDbSecretArn
        }
    ],
    enabled: false,
    documentDbEventSourceConfig: {
        collectionName: 'demo-collection',
        databaseName: 'demo-db',
        fullDocument: 'UpdateLookup'
    }
});
Enter fullscreen mode Exit fullscreen mode

To establish a connection with the OpenSearch domain, we use the Client class from the @opensearch-project/opensearch package. The Client class relies on the AwsSigv4Signer to obtain the credentials of the sync Lambda function and sign requests using the AWS SigV4 algorithm. This signing process is necessary because the OpenSearch domain uses IAM for authentication and authorization.

export function createOpenSearchClient(): Client {
    return new Client({
        ...AwsSigv4Signer({
            region: process.env.AWS_REGION as string,
            getCredentials: () =>
                new Promise((resolve, reject) => {
                    AWS.config.getCredentials((err, credentials) => {
                        if (err) {
                            reject(err);
                        } else {
                            resolve(credentials);
                        }
                    });
                })
        }),
        node: `https://${process.env.OPEN_SEACH_DOMAIN_ENDPOINT}`
    });
}
Enter fullscreen mode Exit fullscreen mode

In the sync function code, we simply instantiate an OpenSearch client, iterate through the change stream events, and write them to the demo-index index.

export async function handler(changeStream: any) {
    const client = createOpenSearchClient();

    for (const { event } of changeStream.events) {
        console.log(`Received change stream event: `, JSON.stringify(event));

        const { _id, ...document } = event.fullDocument;
        await client.index({
            index: 'demo-index',
            id: _id.$oid,
            body: document
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Reading data from OpenSearch domain

To retrieve data from the OpenSearch domain, we create a reader Lambda function and attach it to the API Gateway. The reader function requires the same OpenSearch permissions as the sync function to access the domain. We create an IAM role specifically for the reader function and, similar to the other functions, we include the managed policy AWSLambdaVPCAccessExecutionRole.

const openSearchAccessLambdaRole = new Role(this, 'OpenSearchAccessLambdaRole', {
    roleName: 'OpenSearchAccessLambdaRole',
    assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
    managedPolicies: [ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaVPCAccessExecutionRole')],
    inlinePolicies: {
        openSearchAccessPolicy: new PolicyDocument({
            statements: [
                new PolicyStatement({
                    sid: 'OpenSearchAccess',
                    resources: [`${this.props.openSearchDomainArn}/*`],
                    actions: ['es:ESHttpPost', 'es:ESHttpPut', 'es:ESHttpGet']
                })
            ]
        })
    }
});
Enter fullscreen mode Exit fullscreen mode

We create the reader function using the NodejsFunction construct. In the function's environment, we set the OPEN_SEARCH_DOMAIN_ENDPOINT variable and we attach the function to the GET method of the demo-data resource.

const readerFunction = new NodejsFunction(this, 'ReaderLambda', {
    functionName: 'change-streams-demo-reader',
    entry: path.join(__dirname, '..', 'functions', 'reader.ts'),
    vpc: this.props.vpc,
    securityGroups: [this.props.lambdSecurityGroup],
    environment: {
        OPEN_SEACH_DOMAIN_ENDPOINT: this.props.openSearchDomainEndpoint
    },
    role: roles.openSearchAccessLambdaRole,
    timeout: Duration.seconds(15)
});
this.demoResource.addMethod('GET', new LambdaIntegration(readerFunction, { proxy: true }));
Enter fullscreen mode Exit fullscreen mode

In the function's code, we instantiate the OpenSearch client, query the demo index, and retrieve the data. We include the retrieved data in the body of the function's response, returning it to the caller.

export async function handler() {
    const client = createOpenSearchClient();

    const searchResponse = await client.search({
        index: 'demo-index',
        body: {
            query: {
                match_all: {}
            }
        }
    });

    client.close();
    return { statusCode: 200, body: JSON.stringify(searchResponse.body.hits?.hits) };
};
}
Enter fullscreen mode Exit fullscreen mode

Deploying and testing the synchronization

Before deploying the solution, it is necessary to enable the service-linked role for the OpenSearch service. When performing operations through the AWS Console, this service-linked role is automatically created when required. Therefore, if you have previously set up the OpenSearch domain using the AWS Console, you should already have the service-linked role created. However, if it is not available, you can create it using the AWS CLI command shown below.

aws iam create-service-linked-role --aws-service-name es.amazonaws.com
Enter fullscreen mode Exit fullscreen mode

The entire CDK code is organized into four stacks:

  • change-streams-demo-vpc-stack: Contains VPC definition and security groups.
  • change-streams-demo-documentdb-stack: Defines the DocumentDB cluster.
  • change-streams-demo-opensearch-stack: Sets up the OpenSearch domain.
  • change-streams-demo-lambda-stack: Creates the API Gateway and Lambda functions.

To deploy the entire solution, you can run the npm command shown below. By default, the command will use the account, region, and credentials from your default AWS profile.

npm run cdk deploy -- --all
Enter fullscreen mode Exit fullscreen mode

After the deployment is completed, you will need to retrieve the URL of the API Gateway. Once you have the URL, the next step is to invoke the config endpoint. This will create the demo collection and enable change streams.

curl --location --request POST 'YOUR_API_GATEWAY_URL/prod/config'
Enter fullscreen mode Exit fullscreen mode

After invoking the config endpoint, you need to enable the ESM. You can do this by executing the command below. The ID of the ESM can be found as the value of the esm-id output of the change-streams-demo-lambda-stack stack. Alternatively, you can enable the ESM by opening the sync Lambda function in the AWS console, selecting and enabling ESM from the list of triggers of the function.

aws lambda  update-event-source-mapping --uuid YOUR_ESM_ID --enabled
Enter fullscreen mode Exit fullscreen mode

Now you can start adding data to the DocumentDB cluster by invoking the POST method of the demo-data endpoint.

curl --location 'YOUR_API_GATEWAY_URL/prod/demo-data' \
--header 'Content-Type: application/json' \
--data '{
    "author": {
        "name": "Miljan",
        "surname": "Cabrilo"
    }
}'
Enter fullscreen mode Exit fullscreen mode

Once the data is added, it will be synchronized to the OpenSearch domain. To retrieve the synchronized data, you can invoke the GET method of the demo-data endpoint.

curl --location 'YOUR_API_GATEWAY_URL/prod/demo-data'
Enter fullscreen mode Exit fullscreen mode

The response from invoking the GET method of the demo-data endpoint should contain the same data that was added through the POST method. You can monitor the execution and logs of the Lambda function using the CloudWatch service.

[
    {
        "_index": "demo-index",
        "_id": "64ab1a06183828b741f81c38",
        "_score": 1,
        "_source": {
            "author": {
                "name": "Miljan",
                "surname": "Cabrilo"
            }
        }
    }
]
Enter fullscreen mode Exit fullscreen mode

After testing the synchronization, you can delete the resources by invoking the command below. Stateful resources, such as the DocumentDB cluster and OpenSearch domain, are configured with the RemovalPolicy.DESTROY and will be deleted along with the stacks.

npm run cdk destroy -- --all
Enter fullscreen mode Exit fullscreen mode

All created resources are tagged with the Application tag, which has the value change-streams-demo. Once the destroy command completes execution, you can double-check if all resources have been deleted by using the Tag Editor of the AWS Resource Groups service. The Tag Editor allows you to search for resources based on their tags. Any remaining resources can be deleted manually.

Conclusion

In this post, I have demonstrated how to achieve real-time data synchronization from a DocumentDB cluster to an OpenSearch domain using change streams and a Lambda function. The majority of the heavy lifting is handled by AWS on our behalf. For instance, the Event Source Mapping performs all the complex tasks, such as polling for changes and grouping them into batches, while we simply integrate our Lambda function into the flow.

The architecture example presented here can be used to enhance the search performance of an existing DocumentDB cluster by replicating its data into a search-optimized OpenSearch domain. This is just one example of the numerous possibilities that change streams offer. Since they are easily integrated with Lambda functions, we have the flexibility to use them in any way we desire. For instance, we could react to events within the DocumentDB cluster and trigger a Step Function or send notifications to users and more.

I hope you found this post useful and interesting. If you have any questions regarding the implementation or encounter any deployment issues, feel free to leave a comment below. I'll make sure to respond as promptly as possible.

Top comments (0)