In my previous blog post, I described how to capture data changes in a relational database and push them as records in a Kinesis data stream.
In this new post, I will describe how to convert those database operations into business events of potential interest in near real-time, so other applications can react to them.
With the solution deployed in the previous post, now I have at my disposal a steady stream of all the changes that are committed to tables of interest in the source database. Now, it's a matter of consuming those changes and turning them into a more actionable construct: business events.
For that, I am going to use two AWS Lambda functions and the Lambda capability to filter event sources from Amazon Kinesis Data Streams. This filtering will help me reduce the number of invocations, reducing cost and potentially allowing to scale my application more easily.
When the Lambda functions receive new change data capture (CDC) records, they will, based on some business logic, publish events into the default Amazon EventBridge event bus. The idea behind these functions is to convert database operations into "business events" that carry more business context in their definition and data. Once those events are published in EventBridge, you can use the rich variety of supported EventBridge targets to build event-driven applications that react to them.
For this example, I will publish two business events:
- NegativeProductReviewReceived, issued when a product receives a bad review (less than 4 stars rating).
- ProductStockOverbooked, issued When a transaction is filed with a quantity of a product above the available quantity in stock. Note that this is just to illustrate the use case, depending on the real business logic, a system might not allow an order that has quantities above existing stock.
As aforementioned, I'll use two separate Lambda functions for processing records from the two source tables involved. I could use the same function and two filters, but I think that architecturally it makes more sense this way, because it's likely that two separate teams would take care of those business areas.
The next sections describe the logic for each function.
Lambda function that processes changes on the ProductReview table
This function will publish the NegativeProductReviewReceived event. It will be triggered every time there is an update or insert operation on the Production.ProductReview table and there is a review with a rating below 4. Although the event filter will be configured to check for those specific conditions, I will also include that logic in the function code. This way, if I want to use this function to generate other events, I can simply loosen up the triggering conditions and add that new logic in the function code. Note that, with that logic in the function, I could already configure a more permissive filter to trigger the function, but this way I avoid unnecessary function invocations and showcase how powerful the Lambda event filtering feature is.
Some additional logic in the function will handle the special case of an update; the function will check that the previous value was not lower than or equal to the new value, avoiding issuing an event related to the same review twice.
Lambda function that processes changes on the TransactionHistory table
This function will publish the ProductStockOverbooked event. It will be triggered every time there is an update or insert operation in the Production.TransactionHistory table. It will query the original database to check the existing stock of the product in the transaction and, if the amount is lower than the one found in the transaction, it will issue the ProductStockOverbooked event.
The architecture
This is the architecture for this solution. The most notable characteristic is that the Lambda function that processes the TransactionHistory table is configured to access resources in the same VPC where the AWS DMS replication instance sits. This is to provide connectivity to the source database and enable the function to query data to fulfill its business logic. Due to this and the function needing to access EventBridge to publish events, I will also need to deploy a VPC Endpoint for that service.
Deploying the solution
To make it easier to package and deploy the Lambda functions and all the necessary resources (e.g.: IAM roles for the functions), I will use a very convenient tool when working with serverless applications: the AWS Serverless Application Model (SAM). SAM uses templates similar to those of AWS CloudFormation, but it takes care of downloading and packaging all your function's dependencies into a .zip file or a Docker container, depending on the package type you choose. Furthermore, if you have runtime-specific dependencies (e.g.: Python packages that use C/C++ extensions), you can easily install a Labmda-compatible package by using Docker locally and the --use-container
flag when building the Lambda package. Finally, it uploads the function package or container to Amazon S3 or Amazon ECR, respectively, so you can deploy them using CloudFormation.
If you want to follow the next steps, you need to install the AWS SAM CLI. SAM provides project templates to initialize different kinds of sample serverless applications. I will skip that part, but I encourage you to check the built-in project templates or create your own using Cookiecutter.
-
Set up the following project directory and file structure under a folder of your choice (e.g.: cdc2event)
cdc2event ├── reviews │ ├── app.py │ └── requirements.txt ├── transactions │ ├── app.py │ └── requirements.txt └── template.yaml
-
Edit the app.py file under the reviews directory and paste the following code. You can read details about what the code does in the comments.
import base64 import json from dateutil.parser import parse import boto3 import logging logger = logging.getLogger(name="cdc2eventLogger") logger.setLevel(level=logging.DEBUG) event_bridge = boto3.client("events") def lambda_handler(event, context): """ This function processes CDC records from inserted and updated product reviews that have a rating below 4. For each of the records, it will publish an event in Amazon EventBridge. If the record has been updated, it will only publish an event if the previous value was above or equal to 4. """ events = [] # This list will containt the events to publish on the EventBridge event bus. # An event can contain several Kinesis records. # Each Kinesis record contains data encoded in Base64, so it has to be decoded to a string. # The replication task was configured to deliver JSON objects, so these have # to be parsed into Python objects using the json package. for record in event["Records"]: payload = base64.b64decode(record["kinesis"]["data"]) data = json.loads(payload) logger.debug(f"Processing data: {data}") # If the record comes from a database update operation, # check whether the rating value was above or equal to 4. if data["metadata"]["operation"] == "update" and int(data["before"]["Rating"]) >= 4 or \ data["metadata"]["operation"] == "insert" and data["data"]["Rating"] < 4: # Create an EventBridge event and add it to the list events.append( create_event(data, "com.example.cdc2event") ) else: logger.debug(f"Ignoring event: {data}") # Publish the events in the default EventBridge bus logger.debug(f"Publishing to EventBridge: {events}") if events: event_bridge.put_events( Entries=events ) return def create_event(record, source): """ This function returns an EventBridge event based on the information within a record """ # The detail of the event is a dictionary with any arbitrary structure. # In this case it contains all the information from the data and metadata items # provided by DMS, as it may be relevant for applications downstream. # Depending on the use case, removing certain fields might be needed for regulatory purposes. event = { "Time": parse(record["metadata"]["timestamp"]), "Source": source, "DetailType": "NegativeProductReviewReceived", "Detail": json.dumps({ "data": record["data"], "metadata": record["metadata"] }) } return event
-
That piece of code uses an external Python package, so you need to include it in the Lambda package. For that, edit the requirements.txt file under the reviews directory and paste the following text.
python-dateutil
-
Edit the app.py file under the transactions directory and paste the following code. Check the comments for further insights into it.
import os import base64 import json import logging from dateutil.parser import parse import boto3 import pymssql logger = logging.getLogger(name="cdc2eventLogger") logger.setLevel(level=logging.DEBUG) eventbridge = boto3.client("events") secretsmanager = boto3.client("secretsmanager") connection_details = json.loads( secretsmanager.get_secret_value(SecretId=os.environ["DB_SECRET_ARN"])["SecretString"]) database = os.environ["DB_NAME"] MAX_DB_READ_ATTEMPTS = 3 def connect_to_database(): global db_connection db_connection = None try: db_connection = pymssql.connect( host=connection_details["host"], port=connection_details["port"], user=connection_details["username"], password=connection_details["password"], database=database ) except pymssql.InterfaceError as e: logger.debug(f"Error connecting to the database, please check the host name.\n {e}") except pymssql.DatabaseError as e: logger.debug(f"Error connecting to the database, check the credentials. \n {e}") return db_connection # Initialize the connection in the global scope. # This way, the connection will be reused across function executions. db_connection = connect_to_database() def lambda_handler(event, context): """ This function processes CDC records from inserted and updated product reviews that have a rating below 4. For each of the records, it will publish an event in Amazon EventBridge. If the record has been updated, it will only publish an event if the previous value was above or equal to 4. """ events = [] # This list will containt the events to publish on the EventBridge event bus. # An event can contain several Kinesis records. # Each Kinesis record contains data encoded in Base64, so it has to be decoded to a string. # The replication task was configured to deliver JSON objects, so these have # to be parsed into Python objects using the json package. records = map(decode_kinesis_record, event["Records"]) # Filter out the records where the quantity has not changed to avoid duplicate events. records = list(filter( lambda x: x["data"]["Quantity"] != int(x.get("before", {}).get("Quantity", "-1")), records )) logger.debug(f"Processing changed records: {records}") if len(records) == 0: logger.debug("No records to process.") return # Extract all the product identifiers in all the records to be processed. # This way, there is only one round-trip to the database product_ids = [record["data"]["ProductID"] for record in records] stock = get_stock(product_ids) # Check that the inventory read was successful if stock: # Create an event for every order that has a quantity above the aggregated stock in inventory for record in records: product_id = record["data"]["ProductID"] if record["data"]["Quantity"] > stock[str(product_id)]: # Add the stock quantity to the business event record["data"]["StockQuantity"] = stock[str(product_id)] events.append( create_event(record, "com.example.cdc2event") ) else: logger.debug(f"Ignoring event: {record}") # Publish the events in the default EventBridge bus logger.debug(f"Publishing to EventBridge: {events}") if events: eventbridge.put_events( Entries=events ) return def decode_kinesis_record(record): """ Decodes a CDC record and returns a python object """ payload = base64.b64decode(record["kinesis"]["data"]) data = json.loads(payload) return data def get_stock(product_ids, attempt=1): """ Reads the aggregated stock in inventory of the products provided in the input parameter. Returns a dictionary where each key is the product id and the value is the quantity in stock. If the connection to the database is not successful, it will retry up to MAX_DB_READ_ATTEMPTS times. """ global db_connection stock = {} if db_connection and attempt < MAX_DB_READ_ATTEMPTS: cursor = db_connection.cursor() ids = ",".join(map(str, product_ids)) query = f"SELECT ProductId, SUM([Quantity])\ FROM [Production].[ProductInventory]\ WHERE [ProductId] IN ({ids})\ GROUP BY [ProductId]" logger.debug(f"Executing query: {query}") cursor.execute(query) row = cursor.fetchone() while row: stock[str(row[0])] = row[1] logger.debug(f"ProductId = {row[0]}, Inventory Quantity = {row[1]}") row = cursor.fetchone() elif attempt < MAX_DB_READ_ATTEMPTS: logger.debug("Attempting to reconnect to the database...") db_connection = connect_to_database() get_stock(product_ids=product_ids, attempt=attempt+1) return stock def create_event(record, source): """ This function returns an EventBridge event based on the information within a record """ # The detail of the event is a dictionary with any arbitrary structure. # In this case it contains all the information from the data and metadata items # provided by DMS, as it may be relevant for applications downstream. # Depending on the use case, removing certain fields might be needed for regulatory purposes. event = { "Time": parse(record["metadata"]["timestamp"]), "Source": source, "DetailType": **ProductStockOverbooked**, "Detail": json.dumps({ "data": record["data"], "metadata": record["metadata"] }) } return event
-
This piece of code also uses external packages, so similarly, edit the requirements.txt file under the transactions directory and paste the following text. Note that pymssql uses native extensions, so the SAM CLI feature to use a Lambda-compatible container will come handy.
pymssql>=2.2.3 python-dateutil
-
At this step, fill in the template.yaml file with the following SAM template. I'll highlight a couple of points I think of interest:
- Each Lambda function uses a different architecture: arm64, which runs on AWS Graviton processors and is very cost-performance effective, and x86_64. The reason for the latter is that, at the time of writing, there are no ARM compatible versions for the pymssql package.
- Note that there are no explicit IAM roles defined in the template. SAM will create a role with basic permissions for Lambda functions if you don't specify an explicit one. The nice thing is that you can also add other policies to that implicit role. You can use SAM policy templates, AWS managed policies, customer managed policies, or you can define your own inline. In this case, I opted for two SAM policy templates EventBridgePutEventsPolicy and AWSSecretsManagerGetSecretValuePolicy, which make it straightforward to grant those self-described permissions to the functions.
AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 Description: SAM Template for resources in the III unlocking innovation blog post # Sets the timeout for all the functions in the application Globals: Function: Timeout: 10 Parameters: SourceStreamName: Type: String Default: mssql-cdc Description: ARN of the Kinesis data stream to use as source VpcId: Type: AWS::EC2::VPC::Id Description: Id of the VPC that has connectivity to the source database. A VPC Endpoint for EventBridge will be deployed there. SubnetIds: Type: List<AWS::EC2::Subnet::Id> Description: List of SubnetIds that have connectivity to the source database. A VPC Endpoint for EventBridge will deploy an ENI on each subnet. VpcEndpointSecurityGroupIds: Type: List<AWS::EC2::SecurityGroup::Id> Description: List of SecurityGroupIds for the VPC Endpoint for EventBridge. SecurityGroupIds: Type: List<AWS::EC2::SecurityGroup::Id> Description: List of SecurityGroupIds that allow connecting to the source database DatabaseConnectionSecretArn: Type: String Description: ARN of the AWS SecretsManager secret containing connection details for the source database DatabaseName: Type: String Default: AdventureWorks2019 Description: Name of the source database from where to read the current inventory Resources: # This VPC endpoint is needed because the TransactionHistoryProcessingFunction will have ENIs deployed in private subnets EventBridgeVPCEndpoint: Type: AWS::EC2::VPCEndpoint Properties: PrivateDnsEnabled: true SecurityGroupIds: !Ref VpcEndpointSecurityGroupIds ServiceName: !Sub com.amazonaws.${AWS::Region}.events SubnetIds: !Ref SubnetIds VpcEndpointType: Interface VpcId: !Ref VpcId ProductReviewProcessingFunction: Type: AWS::Serverless::Function Properties: Description: This function processes records from the CDC-populated Kinesis data stream related to the ProductReview table that match certain conditions CodeUri: reviews/ Handler: app.lambda_handler Runtime: python3.9 Architectures: - arm64 Events: KinesisSourceStream: Type: Kinesis Properties: Stream: !Sub - "arn:${AWS::Partition}:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${streamName}" - streamName: !Ref SourceStreamName StartingPosition: TRIM_HORIZON FilterCriteria: Filters: - Pattern: '{ "data": { "metadata": { "operation": ["update", "insert"], "schema-name": ["Production"], "table-name": ["ProductReview"], "record-type": ["data"] }, "data": { "Rating": [ {"numeric": ["<", 4]}] } } }' Policies: - EventBridgePutEventsPolicy: EventBusName: default TransactionHistoryProcessingFunction: Type: AWS::Serverless::Function Properties: Description: This function processes records from the CDC-populated Kinesis data stream related to the TransactionHistory table that match certain conditions CodeUri: transactions/ Handler: app.lambda_handler Runtime: python3.9 Architectures: - x86_64 VpcConfig: SecurityGroupIds: !Ref SecurityGroupIds SubnetIds: !Ref SubnetIds Environment: Variables: DB_SECRET_ARN: !Ref DatabaseConnectionSecretArn DB_NAME: !Ref DatabaseName Events: KinesisSourceStream: Type: Kinesis Properties: Stream: !Sub - "arn:${AWS::Partition}:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${streamName}" - streamName: !Ref SourceStreamName StartingPosition: TRIM_HORIZON FilterCriteria: Filters: - Pattern: '{ "data": { "metadata": { "operation": ["update", "insert"], "schema-name": ["Production"], "table-name": ["TransactionHistory"], "record-type": ["data"] }, "data": { "TransactionType": ["S"] } } }' Policies: - EventBridgePutEventsPolicy: EventBusName: default - AWSSecretsManagerGetSecretValuePolicy: SecretArn: !Ref DatabaseConnectionSecretArn
-
Now it's time to build the template and deploy the solution. Make sure the AWS CLI is configured with an AWS principal with the proper permissions and you're using a profile attached to the AWS account you want to use. Note that SAM does some bootstrapping upon the first use (e.g.: creating an S3 bucket for uploading the processed templates). For the deployment step, you'll need to introduce some parameters such as the name of the application and some CloudFormation behaviors, but the SAM CLI has the
--guided
option that helps you introduce those parameters and store them locally for further reuse. Run the following commands on a terminal
sam build --use-container sam deploy --guided
In subsequent calls after SAM CLI has installed the dependencies locally and you have set the parameters, you can simply run sam build && sam deploy
(unless you want to change any of the two).
Conclusion
Following this post, you now have a steady stream of business events that you can use to build event-driven applications that react to them. In the next blog post, I'll explore how to use the DMS and the existing CDC stream to build an advanced use case.
Clean up
You can delete the resources you have deployed using SAM CLI with the following command:
sam delete
Top comments (0)