In part 2 of my series on Event Driven Step Functions we'll create a completely separate CDK Application that will observe the event traffic generated from the project in part 1. We'll use event traffic to create a timeline of operations (even operations outside of the step function) and we'll also infer the architecture from events. We'll store the events in DynamoDB and also set up a live WebSocket api for a real-time view.
For a personal challenge I wanted to see how far I could get by ONLY using observed event data with no additional IAM access. You could extend the below with access to things like X-Ray / CloudWatch / etc.
We'll be making something that can do this:
What this post will cover:
- High level overview of the architecture
- Lambda functions related to observability
- Creating a useful event interface
What this post will NOT cover:
*I was experimenting with SST for this post but don't want it to be a review / the focus of the post. SST is CDK-based
The code for this post is located here: https://github.com/martzcodes/blog-sst-eventdriven-observability
Architecture for Observing Orchestrated Choreography
In part 1 all of the events being emitted were sent to the default bus of the AWS account. In a multi-account scenario we would just need rules in place to forward from the source bus to some observability bus in another account.
In order for us to process all of the events, we need a broad rule to process all events to our targets. In this case, we emitted all of the events with the source project
, so we can specify that.
With SST rules and lambdas can be created as part of their EventBus construct:
const bus = new EventBus(stack, "Ordered", {
cdk: {
eventBus: CdkEventBus.fromEventBusName(stack, `DefaultBus`, "default"),
},
defaults: {
function: {
environment: {
tableName: table.tableName,
websocketUrl: socketApi.url
},
},
},
rules: {
observer: {
pattern: {
source: ['project']
},
targets: {
sendMessage: "functions/sendMessage.handler",
eventTracker: "functions/eventTracker.handler",
},
},
},
});
bus.attachPermissions([table, socketApi]);
In this architecture we have two targets. A target to store the events into DynamoDB for tracking, and a target to emit the event to the Websocket API so that clients can consume them in real-time.
RestAPI
For our DynamoDB Table we'll be using the SST Table Construct:
const table = new Table(stack, "EventDrivenObservability", {
fields: {
pk: "string",
sk: "string",
ttl: "number",
},
timeToLiveAttribute: "ttl",
primaryIndex: { partitionKey: "pk", sortKey: "sk", },
cdk: {
table: {
removalPolicy: RemovalPolicy.DESTROY,
}
}
});
This is a similar interface to the CDK Table construct but allows us to specify known fields. We have to pass in an override for the removal policy since SST doesn't have an equivalent.
💡 CDK Overrides are typically passed in to SST constructs via a cdk
property.
From there we will create the last two lambdas for the RestAPI:
const api = new Api(stack, "Api", {
defaults: {
function: {
// Allow the API to access the table
permissions: [table],
// Pass in the table name to our API
environment: {
tableName: table.tableName,
},
},
},
routes: {
"GET /latest": "functions/getLatestEvents.handler",
"GET /job/{id}": "functions/getJobEvents.handler",
},
});
The eventTracker
target in the EventBus construct and the getLatestEvents.handler
and getJobEvents.handler
routes SST will create lambdas at those handlers. They get DynamoDB permissions via api.attachPermissions([table]);
.
Lambda: EventTracker
The eventTracker lambda will end up writing two DynamoDB records per event. This could probably be optimized to one recored and a GSI but this ends up being pretty simple.
The first event will be the "LATEST" record. the getLatestEvents
endpoint will end up getting a list of these so you can pick from a list of jobs. We use the primary key (PK) LATEST
along with an event identifier (which ends up being the same as the PK for the other item). We also add a time to live (TTL) of a day so the database ends up cleaning itself up. You could potentially store less data in the LATEST lambda but I kept it consistent with the regular event one.
const time = `${new Date().getTime()}`;
const detail = { ... event.detail };
const latestParams = {
TableName: process.env.tableName,
Item: {
pk: `LATEST`,
sk:`EVENT#${parseExecution(event.detail.execution)}`,
latest: time,
account: event.account,
source: event.source,
detailType: event['detail-type'],
ttl: Math.ceil((new Date().getTime() + 24 * 60 * 60 * 1000) / 1000),
...detail,
},
};
await dynamoDb.put(latestParams as PutItemInput).promise();
For the actual storage event we store it on a PK that is tied to the job that executed. Here event.detail.execution
is the state machine's execution. We included this execution id in every event that was emitted in the previous project. These records will be retrieved via this execution id for the getJobEvents
endpoint.
const params = {
TableName: process.env.tableName,
Item: {
pk: `EVENT#${parseExecution(event.detail.execution)}`,
sk: time,
account: event.account,
source: event.source,
detailType: event['detail-type'],
ttl: Math.ceil((new Date().getTime() + 24 * 60 * 60 * 1000) / 1000),
...detail
},
};
await dynamoDb.put(params as PutItemInput).promise();
Lambda: GetLatestEvents
getLatestEvents
simply returns a list of the LATEST records:
const latestJobs = await dynamoDb
.query({
TableName,
KeyConditionExpression: "#pk = :pk",
ExpressionAttributeNames: {
"#pk": "pk",
},
ExpressionAttributeValues: {
":pk": "LATEST",
},
})
.promise();
return { statusCode: 200, body: JSON.stringify({ latest: latestJobs.Items }) };
Lambda: GetJobEvents
While getJobEvents
returns the job-specific records:
const jobEvents = await dynamoDb
.query({
TableName,
KeyConditionExpression: "#pk = :pk",
ExpressionAttributeNames: {
"#pk": "pk",
},
ExpressionAttributeValues: {
":pk": `EVENT#${event.pathParameters!.id}`,
},
})
.promise();
console.log(JSON.stringify(jobEvents));
return { statusCode: 200, body: JSON.stringify({ events: jobEvents.Items }) };
Finishing up the RestAPI
If we run npm start
SST will deploy a debug stack along with the actual code. After it's deployed if we run the CLI command from part 1 to start an event driven step function... we'll start seeing the items populate in the DynamoDB table that was created.
Websocket API
The Websocket API has a similar construct and will create the remaining two lambda functions via routes:
const socketApi = new WebSocketApi(stack, "SocketApi", {
defaults: {
function: {
environment: {
tableName: table.tableName,
},
},
},
routes: {
$connect: "functions/connect.handler",
$disconnect: "functions/disconnect.handler",
},
});
socketApi.attachPermissions([table]);
We'll re-use the same DynamoDB table for storing connected clients.
Lambda: Connect
The connect lambda will put a client connection item in the database:
const params = {
TableName: process.env.tableName,
Item: {
pk: `CONNECTION`,
sk: event.requestContext.connectionId,
ttl: Math.ceil((new Date().getTime() + 24 * 60 * 60 * 1000) / 1000),
},
};
await dynamoDb.put(params as PutItemInput).promise();
Lambda: Disconnect
The disconnect lambda will delete connections:
const params = {
TableName: process.env.tableName,
Key: {
pk: `CONNECTION`,
sk: event.requestContext.connectionId,
},
};
await dynamoDb.delete(params as DeleteItemInput).promise();
Lambda: SendMessage
And the sendMessage lambda will query all of the connections from DynamoDB
const messageData = JSON.stringify({
pk: `EVENT#${parseExecution(event.detail.execution)}`,
sk: `${new Date().getTime()}`,
account: event.account,
source: event.source,
detailType: event['detail-type'],
...event.detail,
});
// Get all the connections
const connections = await dynamoDb
.query({
TableName,
ProjectionExpression: "sk",
KeyConditionExpression: "#pk = :pk",
ExpressionAttributeNames: {
"#pk": "pk",
},
ExpressionAttributeValues: {
":pk": "CONNECTION",
},
})
.promise();
and then use those connections along with AWS SDK's ApiGatewayManagementApi
to send the event to the connected clients. If it gets an error in response it will remove the client from DynamoDB.
const apiG = new ApiGatewayManagementApi({
endpoint: `${websocketUrl.replace("wss://", "")}`,
});
const postToConnection = async function ({ sk }: { sk: string }) {
console.log(`connection id: ${sk}`);
try {
// Send the message to the given client
await apiG
.postToConnection({ ConnectionId: sk, Data: messageData })
.promise();
} catch (e: any) {
console.log(`caught: ${e}`);
if (e.statusCode === 410) {
// Remove stale connections
await dynamoDb
.delete({ TableName, Key: { pk: "CONNECTION", sk } })
.promise();
}
}
};
// Iterate through all the connections
await Promise.all(
(connections?.Items! as { sk: string }[]).map(postToConnection)
);
Wrapping up the Websocket API
SST watches the directory structure and will continue to deploy things as changes are made. Depending on the changes it may ask you for confirmation. If you didn't have npm start
running, run it now or confirm the changes.
With the changes made, you could use a websocket tester to connect to your Websocket API and if you kick off another step function in the part 1 project, you'd see the events in the logs.
Creating a useful event interface
All of this is great and all but raw events don't tell us much. Or do they? 🤔
An event by itself won't tell us what emitted it or what caused the thing to emit it... to it emit it.
BUT we have access to that information. Each lambda in our event driven step function is essentially wrapped in an incoming and outgoing event. And the lambdas that are doing the work have access to these events... so we can use them.
By creating a common event metadata payload we can get enough information to paint a picture of the architecture and timelines. I extended part 1's putEvent utility with something to add in that incoming event metadata. So now each event will include what started it along with what it's trying to accomplish. Lambda functions also have an environment variable called AWS_LAMBDA_FUNCTION_NAME
which is the lambda functions name. By updating this meta each time we emit the event from a lambda, we can gather all the information we need. We can even fill in the gaps left over from step function events that don't have that fidelity.
const execution = event.detail.execution.split(":");
return {
...event.detail,
execution: event.detail.execution, // arn of the state machine job
meta: {
incoming: {
account: event.account,
source: event.source,
detailType: event["detail-type"],
},
outgoing: {
source: process.env.EVENT_SOURCE,
detailType,
},
fn: process.env.AWS_LAMBDA_FUNCTION_NAME,
stateMachine: execution[execution.length - 2], // 2nd to last is the state machine itself
job: execution[execution.length - 1], // last is the job id
},
};
I also created a SST ViteStaticSite for the react app to act as a client for the Rest and Websocket APIs:
const site = new ViteStaticSite(stack, "SvelteJSSite", {
path: "frontend",
environment: {
// Pass in the API endpoint to our app
VITE_APP_API_URL: api.url,
VITE_SOCKET_API_URL: socketApi.url,
},
});
This react app uses d3 to create the charts and does some post-processing in order to massage the event data into the demonstration below. I am by no means a d3 expert and the code is lengthy, so I'm going to hand wave it. The important part is to encode the data you need into the event payloads so that you can stitch together a picture without additional IAM access.
Demonstration
With the information we encoded in the events we can figure out how long the lambdas executed, which lambdas were invoked by what events and how everything was chained together. We can process this as the events flow in.
Note: I added a random delay to the part1 putEvent to make these graphs more interesting. They're very fast without the delay 😅
A Timeline of Events
Since we know the start and stop time of the lambdas we can build a timeline of events. Here we can see the interactions between things like the getUserCookie
events and the subsequent steps in the state machine.
In this image the pink bars are the async side-effect actions. You can see that the generate report step started after the last purple (?) bar for the intermediate step... while the async step continued.
These charts can be customized to change the color coding or the lambdas. In the timeline I chose to focus on the event that started the action but these could be named with the ending event, function name, or anything we have the data for. In the tooltip I created I included more metadata.
Inferring Architecture
We also have enough data that we can infer the architecture of everything interacting with the default event bus (provided they have the metadata we want).
Here we combine the incoming and outgoing events along with the lambda function names in the metadata to figure out what things are connected to each other.
What-if
These diagrams were created in a day with limited d3 experience and ONLY from event data. What if we extended this with X-Ray, CloudWatch, or endpoints with access to get information about the actual state machines.
What if we tied in something like OpenTelemetry... there's a lot of potential here for a decoupled Event Driven Architecture observer.
Hit me up on Twitter @martzcodes if you've got ideas on how YOU would extend this or what other information you could figure out only from using events. 🍻
Top comments (0)