Amazon S3 is easily my top pick from the vast catalog of different AWS services. Considering its features and the consistency guarantees, it really is an engineering marvel.
Since Amazon S3 is often used as a starting point for various workloads, one might need to integrate S3 events with a workflow orchestration service - like AWS StepFunctions.
This blog post will discuss ways developers might wire the Amazon S3 and AWS StepFunctions together to achieve highly scalable workflows.
Let us dive in.
All the code used in this blog post could be found in this GitHub repository.
Using S3 Notifications and AWS Lambda function
Depending on your use case, this might be the most straightforward option available to you. The integration is well documented and taught in most AWS-related courses.
AWS CDK makes setting this architecture a breeze.
const machine = new sfn.StateMachine(this, "machine", {
definition: new sfn.Pass(this, "pass")
});
const bucket = new s3.Bucket(this, "bucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY
});
/**
* Handler is responsible for forwarding the S3 event to the state machine.
*/
const notificationHandler = new nodeJsLambda.NodejsFunction(
this,
"notificationHandler",
{
handler: "handler",
entry: join(__dirname, "notification-handler.ts"),
environment: {
STATE_MACHINE_ARN: machine.stateMachineArn
}
}
);
notificationHandler.addToRolePolicy(
new iam.PolicyStatement({
actions: ["states:StartExecution"],
resources: [machine.stateMachineArn],
effect: iam.Effect.ALLOW
})
);
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED_PUT,
new s3Notifications.LambdaDestination(notificationHandler),
{ prefix: "uploads/" }
);
Other prolific authors from the AWS community have written many articles regarding this pattern, so I will not repeat their words here. However, I will remind you about some limitations and gotchas that you might encounter while working with S3 Notifications.
Make sure you are not going to create an infinite event loop. This often happens whenever the target AWS Lambda function causes an S3 event that triggers it again.
Keep in mind the limitations of the S3 Notifications event filtering rules. Here is a link to the relevant AWS documentation page.
Keep in mind the delivery guarantees. This point is worth considering regardless of the service. I've read countless StackOverflow questions asking about duplicate events. Consult this documentation piece regarding event delivery guarantees.
Applying the storage-first pattern
In the AWS Serverless space, more and more engineers are talking about storage-first pattern where the data/event is first persisted and then reacted upon. This "store first and react later" approach makes it easy to handle heavy loads (through event batching) and ensures resiliency.
Luckily for us, Amazon S3 integrates with Amazon SQS, making augmenting our current infrastructure a breeze.
const s3EventsQueue = ...
const notificationHandler = ...
notificationHandler.addEventSource(
new lambdaEventSources.SqsEventSource(s3EventsQueue, {
/**
* Increase this number as you please.
*/
batchSize: 1,
enabled: true
})
);
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED_PUT,
// Previously we were targeting the Lambda directly.
new s3Notifications.SqsDestination(s3EventsQueue),
{ prefix: "uploads/" }
);
It is essential to handle errors correctly in the poller AWS Lambda function. Remember that if your Lambda function throws an error, the whole batch of messages will be re-queued. I have yet to discover a better way of handling this than manually deleting messages when they are processed.
If you are interested in the implementation, here is mine.
Using AWS CloudTrail and Amazon EventBridge
For the longest time, I was confident that the only way of integrating AWS S3 with Amazon StepFunctions was through S3 Notifications utilizing an intermediate AWS Lambda function.
I rejoiced when exploring AWS CloudTrail where I learned about EventSelectors and AdvancedEventSelectors. These would allow me to write Amazon S3 and Amazon EventBridge together to achieve a fully "lambda-less" architecture.
The following sections touch on the event selectors rather than the AWS CloudTrail service itself. It would be best if you were familiar with AWS CloudTrail before proceeding.
What are AWS CloudTrail event selectors
AWS CloudTrail event selectors are nothing more than filtering rules one might apply to configure which events get captured by a given trail. It is vital to note that the event selectors only apply to the data events. To my best knowledge, you cannot filter the management events (only turn them off and on).
Update 24.11.2021
As my colleague @rosswilliams pointed out, before embarking on the CloudTrail implementation journey, keep in mind that AWS CloudTrail might deliver your events with up to 15 minutes delay. From my personal experience, the delay is relatively insignificant, and events are delivered instantly. Please keep this fact in mind.
For reference, head to the AWS CloudTrail FAQ, mainly the "Event payload, Timelines, and Delivery Frequency" section.
Utilizing the EventSelectors
The "regular" AWS CloudTrail event selectors allow you to track read and/or write events originating from a specific bucket. The list of all possible events is rather extensive.
The following is an example of how one might create an AWS CloudTrail event selector for an Amazon S3 bucket using AWS CDK.
const bucket = new s3.Bucket(this, "bucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY
});
const trail = new cloudTrail.Trail(this, "trail", {
includeGlobalServiceEvents: false,
isMultiRegionTrail: false
});
trail.addS3EventSelector([{ bucket, objectPrefix: "uploads" }], {
includeManagementEvents: false,
readWriteType: cloudTrail.ReadWriteType.WRITE_ONLY
});
Just like in the case of S3 Notifications, we can specify the prefix
parameter. Unlike the S3 Notifications, we cannot specify the suffix
parameter.
Combined with the broad scope of WRITE_ONLY
events, this setup forces us to do much of our filtering on the AWS EventBridge side of things. Luckily for us, the service we are sending events to is designed with rich filtering capabilities in mind.
The following code creates an AWS EventBridge rule matching Amazon S3 events.
const bucket = ...
const machine = ...
const assetUploadedRule = new events.Rule(this, "assetUploadedRule", {
enabled: true,
eventPattern: {
source: ["aws.s3"],
detail: {
eventSource: ["s3.amazonaws.com"],
eventName: ["PutObject"],
requestParameters: {
bucketName: [bucket.bucketName]
}
}
},
targets: [new eventsTargets.SfnStateMachine(machine)]
});
And that's it! All the PutObject
events originating from the bucket you have applied the EventSelector on will be pushed to Amazon EventBridge and cause state machine invocation.
Utilizing the AdvancedEventSelectors
The "advanced" AWS CloudTrail event selectors offer more sophisticated event filtering capabilities. This is a good thing because every data event incurs AWS CloudTrail cost. Utilizing the advanced event selectors is a excellent way to optimize your CloudTrail costs.
The bad news is that the advanced event selectors are not supported via AWS CloudFormation, making the AWS CDK code a bit more involved.
The following code creates AdvancedEventSelector
using custom resources.
const trail = new cloudTrail.Trail(this, "trail", {
includeGlobalServiceEvents: false,
isMultiRegionTrail: false
});
const cfnTrail = trail.node.defaultChild as cloudTrail.CfnTrail;
const advancedEventSelectorResource = new customResources.AwsCustomResource(
this,
"advancedEventSelectorResource",
{
onCreate: {
action: "putEventSelectors",
service: "CloudTrail",
parameters: {
AdvancedEventSelectors: [
{
FieldSelectors: [
{ Field: "eventCategory", Equals: ["Data"] },
{ Field: "resources.type", Equals: ["AWS::S3::Object"] },
{ Field: "eventName", Equals: ["PutObject"] },
{
Field: "resources.ARN",
StartsWith: [bucket.arnForObjects("uploads/")]
}
],
Name: "Listens to the PutObject only"
}
],
TrailName: cfnTrail.ref
},
physicalResourceId:
customResources.PhysicalResourceId.fromResponse("TrailARN")
},
onDelete: {
action: "putEventSelectors",
service: "CloudTrail",
parameters: {
AdvancedSelectors: [],
TrailName: cfnTrail.ref
}
},
policy: customResources.AwsCustomResourcePolicy.fromSdkCalls({
resources: [trail.trailArn]
})
}
);
The AWS EventBridge rule declaration has not changed.
const bucket = ...
const machine = ...
const assetUploadedRule = new events.Rule(this, "assetUploadedRule", {
enabled: true,
eventPattern: {
source: ["aws.s3"],
detail: {
eventSource: ["s3.amazonaws.com"],
eventName: ["PutObject"],
requestParameters: {
bucketName: [bucket.bucketName]
}
}
},
targets: [new eventsTargets.SfnStateMachine(machine)]
});
If I were to choose between the "regular" and "advanced" event selectors, I would personally be inclined to pick the latter - mainly due to cost savings.
Closing words
There you have it. These were two ways one might integrate Amazon S3 events with AWS StepFunctions state machine. Of course, in the world of AWS, there are probably much more ways to do this. I wanted to point you towards the ones I find very useful.
As always, thank you for your valuable time.
You can find me on Twitter - @wm_matuszewski
Top comments (8)
Good write-up. As commented, CloudTrail is probably an overkill unless there is a specific reason. In most cases, the lambda trigger should fit. If not, SNS or SQS trigger would help.
If you know the S3 object key pattern, then the prefix/suffix based event notification is handy.
In the SQ to Lambda diagram above, not sure if you recommend manual polling for messages from the queue. If so, you need to take care of deleting the messages. Using SQS as the event source for lambda would make that simple and easy. You get to choose the batch size and can adjust the SQS params to make it work the way required.
Hey @sheenbrisals .
Regarding the SQS -> Lambda integration (via EventSourceMapping or manual polling) - if the function throws an error, the whole batch of messages will be re-queed to my best knowledge. This might cause a problem where one bad message causes the entire batch to get re-computed repeatedly.
The pattern of manually deleting the SQS messages is also implemented in Lambda Powertools for Python
Thanks! I see your point. Yes, that pattern people use to overcome that issue.
Wow, just as I finished writing this comment, AWS released aws.amazon.com/about-aws/whats-new....
It seems like the approach of manually deleting the SQS messages might no longer be needed. I still need to experiment with the new way ESM handles SQS batches, though.
On 23/11/2021 AWS enabled the ability for Lambdas to send partial batch responses to SQS. Must be tough writing guides around re:Invent time
Yup :D At least I have an idea for another blog post!
One downside to Cloudtrail is that events may take 15 minutes to reach EventBridge. The SQS + Lambda solution can use a DLQ and an alarm on the DLQ for message age to alert any issues in the pipeline.
Right! I always forget about the CloudTrail delay. I will update the article as soon as I have some free time.
Thank you for reminding me about this @rosswilliams .