TL; DR.
This article provides a comprehensive guide to leveraging AWS Step Functions for efficient orchestration of serverless application workflows. We’ll delve into integrating these workflows with your application backends, ensuring that inputs are correctly processed and outputs are accurately returned. The focus is on simplifying workflow management in complex serverless environments and enhancing the reliability of data exchange within these processes.
Introduction
Managing workflows in serverless applications can become increasingly challenging as processes proliferate. Fortunately, AWS Step Functions offer a robust solution for orchestrating and managing these serverless processes efficiently.
In this article, we’ll demonstrate how to integrate these workflows seamlessly with your application backends. More importantly, we’ll focus on ensuring accurate handling and return of both inputs and outputs within these orchestrated workflows.
You can find all the sources of this project on Github.
Donovan1905 / apigw-sfn-integration
AWS Api Gateway integration for Step Functions
apigw-sfn-integration
AWS Api Gateway integration for Step Functions
The use cases
In modern cloud architectures, the need to efficiently manage complex workflows often leads to the integration of AWS Step Functions with API Gateway. This combination is particularly valuable when dealing with multi-step processes that require orchestration across various AWS services. By leveraging API Gateway to trigger Step Functions, developers can initiate intricate workflows through simple HTTP requests, streamlining the execution of tasks that are too complex or lengthy for traditional, single-step functions.
Because of the API Gateway timeout limit, you will have to choose between two possible approaches, the synchronous and asynchronous ways :
Synchronous
If you want your API response to give you the final result of your Step Function execution, you will need to use Express function type. This type will allow you to use the states:StartSyncExecution
action and wait for the output of your workflow. Even if the Express Step Function timeout limit is much lower that the Standard ones (5 minutes instead of 1 year), it will be even more limited by the API Gateway timeout limit, which is 29 seconds.
So if your workflows will for sure have an execution time lower that 29 seconds and the output of your Step Function are important regarding your need, Express Step Functions are your solution !
Asynchronous
On the other hand, if your executions will, even though it is only some of those, you will need to use Standard Step Function which a timeout limit of 1 year with the states:StartExecution
action (only possible invocation with Standard functions). Since this will execute asynchonously, you will get the execution Arn as an Api output, like this :
{
"executionArn": "arn:aws:states:<region>:<account>:execution:<function_name>:<execution_name>",
"startDate": <date>
}
With this Arn you will be able to use the states:DescribeExecution
to get the state of the execution (Running, Succeeded, Failed, …) as well as the output of the state machine. You fetch this data or with the AWS SDK in your application of by creating another Api route that will retrieve the execution data of a given execution Arn.
Alternatively, you could publish messages on an SQS queue all along the execution and consume it by the concerned application directly. This will prevent your application to retry the DescribeExecution
call and wait for the end of the process to get the result state. You could for example update a status of the workflow (for example an EC2 instance that starts, process some data and then stop) in live on a dashboard.
No matter which approach you choose, you will also need to define request and response mapping in the Api Gateway Integration. Theses are meant to define a template that will transform the data between Api Gateway and the Step Function.
Now that you have the keys to choose the type of the functions that fit your need, we will go further with the implementation of this infrastructure using Terraform.
Deploy with Terraform
The Step Function
Let's take a simple workflow for this one. Starting by checking the status of a given instance, if it is stopped we start it but if it is already running we stop it. There will be 2 Step Function like this except that one got a delay timer of 29 seconds so we can test the asynchronous approach.
First, create .tftpl
files that will be loaded into Terraform as datasources to deploy the Step Function :
# ./express_state_machine_template.tftpl
{
"StartAt":"CheckInstanceStatus",
"States":{
"CheckInstanceStatus":{
"Type":"Task",
"Resource":"arn:aws:states:::aws-sdk:ec2:describeInstanceStatus",
"Parameters":{
"InstanceIds.$": "States.Array($.instance_id)"
},
"ResultPath":"$.InstanceStatus",
"Next":"InstanceStatusDecision"
},
"InstanceStatusDecision":{
"Type":"Choice",
"Choices":[
{
"Variable":"$.InstanceStatus.InstanceStatuses[0]",
"IsPresent": false,
"Next":"StartInstance"
},
{
"Variable":"$.InstanceStatus.InstanceStatuses[0].InstanceState.Name",
"StringEquals":"running",
"Next":"StopInstance"
}
],
"Default":"EndState"
},
"StartInstance":{
"Type":"Task",
"Resource":"arn:aws:states:::aws-sdk:ec2:startInstances",
"Parameters":{
"InstanceIds.$": "States.Array($.instance_id)"
},
"End":true
},
"StopInstance":{
"Type":"Task",
"Resource":"arn:aws:states:::aws-sdk:ec2:stopInstances",
"Parameters":{
"InstanceIds.$": "States.Array($.instance_id)"
},
"End":true
},
"EndState":{
"Type":"Succeed"
}
}
}
For ./standard_state_machine_template.tftpl
simply add the following action :
...
"Wait": {
"Type": "Wait",
"Seconds": 29,
"Next": "CheckInstanceStatus"
}
...
Next, import the templates as datasources and define your Step Functions resources :
# ./sfn.tf
data "template_file" "express_state_machine_template" {
template = file("${path.module}/express_state_machine_template.tftpl")
vars = {
instance_id = aws_instance.example.id
}
}
resource "aws_sfn_state_machine" "express_sfn_state_machine" {
name = "${var.project_name}-express"
role_arn = aws_iam_role.iam_for_sfn.arn
type = "EXPRESS"
definition = data.template_file.express_state_machine_template.rendered
}
data "template_file" "standard_state_machine_template" {
template = file("${path.module}/standard_state_machine_template.tftpl")
vars = {
instance_id = aws_instance.example.id
}
}
resource "aws_sfn_state_machine" "standard_sfn_state_machine" {
name = "${var.project_name}-standard"
role_arn = aws_iam_role.iam_for_sfn.arn
type = "STANDARD"
definition = data.template_file.standard_state_machine_template.rendered
}
IAM roles
Then we define the two IAM roles that will be used, one for the Step Function execution and one to allow the Api Gateway to invoke the functions.
# ./iam.tf
./data "aws_iam_policy_document" "assume_role_policy_sfn" {
statement {
effect = "Allow"
principals {
identifiers = ["states.amazonaws.com"]
type = "Service"
}
actions = ["sts:AssumeRole"]
}
}
data "aws_iam_policy_document" "role_policy_sfn" {
statement {
effect = "Allow"
actions = [
"ec2:"
]
resources = [
"",
]
}
statement {
sid = "LoggingPolicy"
effect = "Allow"
actions = [
"logs:"
]
resources = [
""
]
}
}
resource "aws_iam_role" "iam_for_sfn" {
name = "stepFunctionExecutionIAM"
inline_policy {
name = "PolicyForSfn"
policy = data.aws_iam_policy_document.role_policy_sfn.json
}
assume_role_policy = data.aws_iam_policy_document.assume_role_policy_sfn.json
}
data "aws_iam_policy_document" "assume_role_policy_apigw" {
statement {
sid = ""
effect = "Allow"
principals {
identifiers = ["apigateway.amazonaws.com"]
type = "Service"
}
actions = ["sts:AssumeRole"]
}
}
data "aws_iam_policy_document" "policy_start_sfn" {
statement {
sid = "ApiGwPolicy"
effect = "Allow"
actions = [
"states:StartSyncExecution",
"states:StartExecution"
]
resources = [
"*"
]
}
}
resource "aws_iam_role" "iam_for_apigw_start_sfn" {
name = "${var.project_name}-apigw-exec-sfn"
assume_role_policy = data.aws_iam_policy_document.assume_role_policy_apigw.json
}
resource "aws_iam_role_policy" "policy_start_sfn" {
policy = data.aws_iam_policy_document.policy_start_sfn.json
role = aws_iam_role.iam_for_apigw_start_sfn.id
}
EC2 instance
We just need a simple EC2 instance, nothing particular here it just needs to be started and stopped.
# ./ec2.tf
data "aws_ami" "amazon_linux_2023" {
most_recent = true
owners = ["amazon"]
filter {
name = "architecture"
values = ["x86_64"]
}
filter {
name = "description"
values = ["Amazon Linux 2023 *"]
}
}
resource "aws_instance" "example" {
ami = data.aws_ami.amazon_linux_2023.id
instance_type = "t3.nano"
tags = {
Name = "HelloWorld"
}
}
Finally, Api Gateway !
Our Api definition will contain all the following resources :
- api_gateway_rest_api : the Api itself
- api_gateway_resource : the resource name and path part (one per Step Function)
- api_gateway_method : the associated resource HTTP verb (one per Step Function)
- api_gateway_deployment : the content of Api definition
- api_gateway_integration : the link between your Api route and your Step Function. This is where you define your request mapping template to transform and pass the input to the function (one per Step Function). In the integration, we need to define how we are going to invoke the function,
states:StartSyncExecution
orstates:StartExecution
by setting the uri parameter with the following structurearn:aws:apigateway:<region>:states:action/<action_type>
. Also, the request mapping template will contain the input biding as well as the stateMachineArn. - api_gateway_method_response and api_gateway_integration_response : the link between your Step Function output and the Api Gateway. This is where you define your response mapping template to transform your Step Function result and pass it to the Api Gateway (one per Step Function)
# ./apigw.tf
resource "aws_api_gateway_rest_api" "apigw" {
name = "${var.project_name}-apigw"
}
resource "aws_api_gateway_resource" "express_ec2" {
parent_id = aws_api_gateway_rest_api.apigw.root_resource_id
path_part = "express-instance"
rest_api_id = aws_api_gateway_rest_api.apigw.id
}
resource "aws_api_gateway_method" "express_post" {
authorization = "NONE"
http_method = "POST"
resource_id = aws_api_gateway_resource.express_ec2.id
rest_api_id = aws_api_gateway_rest_api.apigw.id
}
resource "aws_api_gateway_resource" "standard_ec2" {
parent_id = aws_api_gateway_rest_api.apigw.root_resource_id
path_part = "standard-instance"
rest_api_id = aws_api_gateway_rest_api.apigw.id
}
resource "aws_api_gateway_method" "standard_post" {
authorization = "NONE"
http_method = "POST"
resource_id = aws_api_gateway_resource.standard_ec2.id
rest_api_id = aws_api_gateway_rest_api.apigw.id
}
resource "aws_api_gateway_deployment" "example" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
triggers = {
redeployment = sha1(jsonencode([
aws_api_gateway_resource.express_ec2.id,
aws_api_gateway_method.express_post.id,
aws_api_gateway_integration.express_integration.id,
aws_api_gateway_resource.standard_ec2.id,
aws_api_gateway_method.standard_post.id,
aws_api_gateway_integration.standard_integration.id,
]))
}
lifecycle {
create_before_destroy = true
}
}
resource "aws_api_gateway_integration" "express_integration" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
resource_id = aws_api_gateway_resource.express_ec2.id
http_method = aws_api_gateway_method.express_post.http_method
integration_http_method = "POST"
type = "AWS"
uri = (
"arn:aws:apigateway:${var.region}:states:action/StartSyncExecution"
)
credentials = aws_iam_role.iam_for_apigw_start_sfn.arn
request_templates = {
"application/json" = <<EOF
set($input = $input.json('$'))
{
"input": "$util.escapeJavaScript($input).replaceAll("\'", "'")",
"stateMachineArn": "${aws_sfn_state_machine.express_sfn_state_machine.arn}"
}
EOF
}
}
resource "aws_api_gateway_integration" "standard_integration" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
resource_id = aws_api_gateway_resource.standard_ec2.id
http_method = aws_api_gateway_method.standard_post.http_method
integration_http_method = "POST"
type = "AWS"
uri = (
"arn:aws:apigateway:${var.region}:states:action/StartExecution"
)
credentials = aws_iam_role.iam_for_apigw_start_sfn.arn
request_templates = {
"application/json" = <<EOF
set($input = $input.json('$'))
{
"input": "$util.escapeJavaScript($input).replaceAll("\'", "'")",
"stateMachineArn": "${aws_sfn_state_machine.standard_sfn_state_machine.arn}"
}
EOF
}
}
resource "aws_api_gateway_method_response" "express_response_200" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
resource_id = aws_api_gateway_resource.express_ec2.id
http_method = aws_api_gateway_method.express_post.http_method
status_code = "200"
}
resource "aws_api_gateway_integration_response" "express_response_200" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
resource_id = aws_api_gateway_resource.express_ec2.id
http_method = aws_api_gateway_method.express_post.http_method
status_code = aws_api_gateway_method_response.express_response_200.status_code
response_templates = {
"application/json" = <<EOF
set ($parsedPayload = $util.parseJson($input.json('$.output')))
$parsedPayload
EOF
}
}
resource "aws_api_gateway_method_response" "standard_response_200" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
resource_id = aws_api_gateway_resource.standard_ec2.id
http_method = aws_api_gateway_method.standard_post.http_method
status_code = "200"
}
resource "aws_api_gateway_integration_response" "standard_response_200" {
rest_api_id = aws_api_gateway_rest_api.apigw.id
resource_id = aws_api_gateway_resource.standard_ec2.id
http_method = aws_api_gateway_method.standard_post.http_method
status_code = aws_api_gateway_method_response.standard_response_200.status_code
response_templates = {
"application/json" = <<EOF
set ($parsedPayload = $util.parseJson($input.json('$')))
$parsedPayload
EOF
}
}
Let's test it !
Make sure to apply the terraform configuration with terraform apply
.
Synchronous route
Go your Api Gateway, choose the /express-instance
resource with the POST verb, in the "Test" tabs add the following input :
After waiting a few seconds, you will first see the input transformation :
And then the output of the workflow :
Now your instance is running.
Asynchronous route
Go your Api Gateway, choose the /standard-instance
resource with the POST verb, in the "Test" tabs .
Enter the same input that we used. Now you will get the result instantly and the input transformation will remain the same.
But the output will now contain the executionArn
:
If you want to check the status of the execution or the content you can use the states:DescribeExecution
. Here with the cli but you will ideally use it with SDK in your application or in a Lambda exposed on your Api Gateway :
Go further…
In addition of the different approaches present in this article, you could integrate in your Step Function many SQS messages publication where you want your application to know the status changes. This will allow your application to react faster based on the internal function state changes.
To finish, you can read this article by AWS that treats about integrating Step Function and track its activity with the Api Gateway v2 WebSockets.
Thanks for reading ! Hope this helped you to use or understand how to take advantages of your Step Function thanks to the Api Gateway. Don’t hesitate to give me your feedback or suggestions.
Top comments (0)