DEV Community

Cover image for Amazon S3 to AWS StepFunctions pattern
Wojciech Matuszewski for AWS Community Builders

Posted on • Updated on

Amazon S3 to AWS StepFunctions pattern

Amazon S3 is easily my top pick from the vast catalog of different AWS services. Considering its features and the consistency guarantees, it really is an engineering marvel.

Since Amazon S3 is often used as a starting point for various workloads, one might need to integrate S3 events with a workflow orchestration service - like AWS StepFunctions.

This blog post will discuss ways developers might wire the Amazon S3 and AWS StepFunctions together to achieve highly scalable workflows.

Let us dive in.

Mistery

All the code used in this blog post could be found in this GitHub repository.

Using S3 Notifications and AWS Lambda function

Depending on your use case, this might be the most straightforward option available to you. The integration is well documented and taught in most AWS-related courses.

S3 Notification with AWS Lambda

AWS CDK makes setting this architecture a breeze.

const machine = new sfn.StateMachine(this, "machine", {
  definition: new sfn.Pass(this, "pass")
});

const bucket = new s3.Bucket(this, "bucket", {
  removalPolicy: cdk.RemovalPolicy.DESTROY
});

/**
 * Handler is responsible for forwarding the S3 event to the state machine.
 */
const notificationHandler = new nodeJsLambda.NodejsFunction(
  this,
  "notificationHandler",
  {
    handler: "handler",
    entry: join(__dirname, "notification-handler.ts"),
    environment: {
      STATE_MACHINE_ARN: machine.stateMachineArn
    }
  }
);
notificationHandler.addToRolePolicy(
  new iam.PolicyStatement({
    actions: ["states:StartExecution"],
    resources: [machine.stateMachineArn],
    effect: iam.Effect.ALLOW
  })
);

bucket.addEventNotification(
  s3.EventType.OBJECT_CREATED_PUT,
  new s3Notifications.LambdaDestination(notificationHandler),
  { prefix: "uploads/" }
);
Enter fullscreen mode Exit fullscreen mode

Other prolific authors from the AWS community have written many articles regarding this pattern, so I will not repeat their words here. However, I will remind you about some limitations and gotchas that you might encounter while working with S3 Notifications.

  • Make sure you are not going to create an infinite event loop. This often happens whenever the target AWS Lambda function causes an S3 event that triggers it again.

  • Keep in mind the limitations of the S3 Notifications event filtering rules. Here is a link to the relevant AWS documentation page.

  • Keep in mind the delivery guarantees. This point is worth considering regardless of the service. I've read countless StackOverflow questions asking about duplicate events. Consult this documentation piece regarding event delivery guarantees.

Applying the storage-first pattern

In the AWS Serverless space, more and more engineers are talking about storage-first pattern where the data/event is first persisted and then reacted upon. This "store first and react later" approach makes it easy to handle heavy loads (through event batching) and ensures resiliency.

S3 Notifications storage first

Luckily for us, Amazon S3 integrates with Amazon SQS, making augmenting our current infrastructure a breeze.

const s3EventsQueue = ...

const notificationHandler = ...

notificationHandler.addEventSource(
  new lambdaEventSources.SqsEventSource(s3EventsQueue, {
    /**
     * Increase this number as you please.
     */
    batchSize: 1,
    enabled: true
  })
);

bucket.addEventNotification(
  s3.EventType.OBJECT_CREATED_PUT,
  // Previously we were targeting the Lambda directly.
  new s3Notifications.SqsDestination(s3EventsQueue),
  { prefix: "uploads/" }
);
Enter fullscreen mode Exit fullscreen mode

It is essential to handle errors correctly in the poller AWS Lambda function. Remember that if your Lambda function throws an error, the whole batch of messages will be re-queued. I have yet to discover a better way of handling this than manually deleting messages when they are processed.

If you are interested in the implementation, here is mine.

Using AWS CloudTrail and Amazon EventBridge

For the longest time, I was confident that the only way of integrating AWS S3 with Amazon StepFunctions was through S3 Notifications utilizing an intermediate AWS Lambda function.

I rejoiced when exploring AWS CloudTrail where I learned about EventSelectors and AdvancedEventSelectors. These would allow me to write Amazon S3 and Amazon EventBridge together to achieve a fully "lambda-less" architecture.

S3 Notification with AWS CloudTrail

The following sections touch on the event selectors rather than the AWS CloudTrail service itself. It would be best if you were familiar with AWS CloudTrail before proceeding.

What are AWS CloudTrail event selectors

AWS CloudTrail event selectors are nothing more than filtering rules one might apply to configure which events get captured by a given trail. It is vital to note that the event selectors only apply to the data events. To my best knowledge, you cannot filter the management events (only turn them off and on).

CloudTrail event selector

Update 24.11.2021

As my colleague @rosswilliams pointed out, before embarking on the CloudTrail implementation journey, keep in mind that AWS CloudTrail might deliver your events with up to 15 minutes delay. From my personal experience, the delay is relatively insignificant, and events are delivered instantly. Please keep this fact in mind.

For reference, head to the AWS CloudTrail FAQ, mainly the "Event payload, Timelines, and Delivery Frequency" section.

Utilizing the EventSelectors

The "regular" AWS CloudTrail event selectors allow you to track read and/or write events originating from a specific bucket. The list of all possible events is rather extensive.

The following is an example of how one might create an AWS CloudTrail event selector for an Amazon S3 bucket using AWS CDK.

const bucket = new s3.Bucket(this, "bucket", {
  removalPolicy: cdk.RemovalPolicy.DESTROY
});

const trail = new cloudTrail.Trail(this, "trail", {
  includeGlobalServiceEvents: false,
  isMultiRegionTrail: false
});

trail.addS3EventSelector([{ bucket, objectPrefix: "uploads" }], {
  includeManagementEvents: false,
  readWriteType: cloudTrail.ReadWriteType.WRITE_ONLY
});
Enter fullscreen mode Exit fullscreen mode

Just like in the case of S3 Notifications, we can specify the prefix parameter. Unlike the S3 Notifications, we cannot specify the suffix parameter.

Combined with the broad scope of WRITE_ONLY events, this setup forces us to do much of our filtering on the AWS EventBridge side of things. Luckily for us, the service we are sending events to is designed with rich filtering capabilities in mind.

The following code creates an AWS EventBridge rule matching Amazon S3 events.

const bucket = ...

const machine = ...

const assetUploadedRule = new events.Rule(this, "assetUploadedRule", {
  enabled: true,
  eventPattern: {
    source: ["aws.s3"],
    detail: {
      eventSource: ["s3.amazonaws.com"],
      eventName: ["PutObject"],
      requestParameters: {
        bucketName: [bucket.bucketName]
      }
    }
  },
  targets: [new eventsTargets.SfnStateMachine(machine)]
});
Enter fullscreen mode Exit fullscreen mode

And that's it! All the PutObject events originating from the bucket you have applied the EventSelector on will be pushed to Amazon EventBridge and cause state machine invocation.

Utilizing the AdvancedEventSelectors

The "advanced" AWS CloudTrail event selectors offer more sophisticated event filtering capabilities. This is a good thing because every data event incurs AWS CloudTrail cost. Utilizing the advanced event selectors is a excellent way to optimize your CloudTrail costs.

The bad news is that the advanced event selectors are not supported via AWS CloudFormation, making the AWS CDK code a bit more involved.

The following code creates AdvancedEventSelector using custom resources.

const trail = new cloudTrail.Trail(this, "trail", {
  includeGlobalServiceEvents: false,
  isMultiRegionTrail: false
});
const cfnTrail = trail.node.defaultChild as cloudTrail.CfnTrail;

const advancedEventSelectorResource = new customResources.AwsCustomResource(
  this,
  "advancedEventSelectorResource",
  {
    onCreate: {
      action: "putEventSelectors",
      service: "CloudTrail",
      parameters: {
        AdvancedEventSelectors: [
          {
            FieldSelectors: [
              { Field: "eventCategory", Equals: ["Data"] },
              { Field: "resources.type", Equals: ["AWS::S3::Object"] },
              { Field: "eventName", Equals: ["PutObject"] },
              {
                Field: "resources.ARN",
                StartsWith: [bucket.arnForObjects("uploads/")]
              }
            ],
            Name: "Listens to the PutObject only"
          }
        ],
        TrailName: cfnTrail.ref
      },
      physicalResourceId:
        customResources.PhysicalResourceId.fromResponse("TrailARN")
    },
    onDelete: {
      action: "putEventSelectors",
      service: "CloudTrail",
      parameters: {
        AdvancedSelectors: [],
        TrailName: cfnTrail.ref
      }
    },
    policy: customResources.AwsCustomResourcePolicy.fromSdkCalls({
      resources: [trail.trailArn]
    })
  }
);
Enter fullscreen mode Exit fullscreen mode

The AWS EventBridge rule declaration has not changed.

const bucket = ...

const machine = ...

const assetUploadedRule = new events.Rule(this, "assetUploadedRule", {
  enabled: true,
  eventPattern: {
    source: ["aws.s3"],
    detail: {
      eventSource: ["s3.amazonaws.com"],
      eventName: ["PutObject"],
      requestParameters: {
        bucketName: [bucket.bucketName]
      }
    }
  },
  targets: [new eventsTargets.SfnStateMachine(machine)]
});
Enter fullscreen mode Exit fullscreen mode

If I were to choose between the "regular" and "advanced" event selectors, I would personally be inclined to pick the latter - mainly due to cost savings.

Closing words

There you have it. These were two ways one might integrate Amazon S3 events with AWS StepFunctions state machine. Of course, in the world of AWS, there are probably much more ways to do this. I wanted to point you towards the ones I find very useful.

As always, thank you for your valuable time.

You can find me on Twitter - @wm_matuszewski

Discussion (8)

Collapse
sheenbrisals profile image
Sheen Brisals

Good write-up. As commented, CloudTrail is probably an overkill unless there is a specific reason. In most cases, the lambda trigger should fit. If not, SNS or SQS trigger would help.
If you know the S3 object key pattern, then the prefix/suffix based event notification is handy.
In the SQ to Lambda diagram above, not sure if you recommend manual polling for messages from the queue. If so, you need to take care of deleting the messages. Using SQS as the event source for lambda would make that simple and easy. You get to choose the batch size and can adjust the SQS params to make it work the way required.

Collapse
wojciechmatuszewski profile image
Wojciech Matuszewski Author • Edited on

Hey @sheenbrisals .

Regarding the SQS -> Lambda integration (via EventSourceMapping or manual polling) - if the function throws an error, the whole batch of messages will be re-queed to my best knowledge. This might cause a problem where one bad message causes the entire batch to get re-computed repeatedly.

The pattern of manually deleting the SQS messages is also implemented in Lambda Powertools for Python

Collapse
sheenbrisals profile image
Sheen Brisals

Thanks! I see your point. Yes, that pattern people use to overcome that issue.

Collapse
wojciechmatuszewski profile image
Wojciech Matuszewski Author

Wow, just as I finished writing this comment, AWS released aws.amazon.com/about-aws/whats-new....

It seems like the approach of manually deleting the SQS messages might no longer be needed. I still need to experiment with the new way ESM handles SQS batches, though.

Collapse
rosswilliams profile image
rosswilliams

if your Lambda function throws an error, the whole batch of messages will be re-queued

On 23/11/2021 AWS enabled the ability for Lambdas to send partial batch responses to SQS. Must be tough writing guides around re:Invent time

Collapse
wojciechmatuszewski profile image
Wojciech Matuszewski Author

Yup :D At least I have an idea for another blog post!

Collapse
rosswilliams profile image
rosswilliams

One downside to Cloudtrail is that events may take 15 minutes to reach EventBridge. The SQS + Lambda solution can use a DLQ and an alarm on the DLQ for message age to alert any issues in the pipeline.

Collapse
wojciechmatuszewski profile image
Wojciech Matuszewski Author

Right! I always forget about the CloudTrail delay. I will update the article as soon as I have some free time.

Thank you for reminding me about this @rosswilliams .