DEV Community

Steven Smiley
Steven Smiley

Posted on • Edited on

AWS Step Functions: Handling Paginated API Responses

Since AWS Step Functions added support for AWS SDK integration, it has become very powerful for serverless integration of AWS services. Previously, the go-to approach would be a simple Lambda function, but Step Functions involves even less maintenance. However, Step Functions has (mostly deliberate) limitations that we need to work with/around.

Some AWS API actions return paginated responses, meaning you get a chunk of the total response and you have to ask for the next chunk if you want it. So if there are 150 items, you may need to pull them 25 at a time. How do we handle this in Step Functions, where each state only gets to make one call?

The key to handling pagination is to loop based on the presence of a continuation token item, usually named NextToken. When a paginated response is returned, the JSON includes this token, which (1) indicates there additional items to be retrieved and (2) needs to be provided in the subsequent API calls to indicate you want the next page. Unfortunately the token name isnโ€™t consistent, see Ian McKayโ€™s complete list of rules for AWS SDK pagination.

Here's a concrete example. Let's say we want to perform an action on every instance of the Amazon WorkSpaces service, each representing a virtual desktop, like updating it to use the latest image. workspaces:DescribeWorkspaces returns 25 at a time, but we have several hundred.

Diagram of workflow

We call DescribeWorkspaces to get the first set of items and map each one to our desired action, in this case RebuildWorkspaces. Then a Choice state checks if NextToken is present in the task result. If so, call DescribeWorkspaces again, this time using the NextToken parameter. Critically, this overwrites the task result from the first call, so the next time we reach the Choice state, it can check if there are even more items to retrieve (NextToken is still present). If we've reached the end (NextToken isn't present), we can move on with the workflow.

Here's the full definition of this Step Functions state machine in AWS CDK with Python.



import aws_cdk.aws_stepfunctions as sfn
import aws_cdk.aws_stepfunctions_tasks as sfn_tasks

describe_workspaces = sfn_tasks.CallAwsService(
            self,
            id="DescribeWorkspaces",
            comment="Get workspaces",
            service="workspaces",
            action="describeWorkspaces",
            result_path="$.DescribeWorkspacesResult",
            iam_resources=["*"],
        )

        describe_more_workspaces = sfn_tasks.CallAwsService(
            self,
            id="DescribeMoreWorkspaces",
            comment="Get workspaces with NextToken",
            service="workspaces",
            action="describeWorkspaces",
            parameters={
                "NextToken": sfn.JsonPath.string_at(
                    "$.DescribeWorkspacesResult.NextToken"
                )
            },
            result_path="$.DescribeWorkspacesResult",
            iam_resources=["*"],
        )

        rebuild_workspaces = sfn_tasks.CallAwsService(
            self,
            id="RebuildWorkspaces",
            comment="Rebuild workspaces",
            service="workspaces",
            action="rebuildWorkspaces",
            parameters={
                "RebuildWorkspaceRequests": [
                    {"WorkspaceId": sfn.JsonPath.string_at("$.WorkspaceId")}
                ]
            },
            result_path="$.RebuildWorkspacesResult",
            iam_resources=["*"],
        )

        rebuild_each_workspace = sfn.Map(
            self,
            id="RebuildEachWorkspace",
            comment="Rebuild each workspace",
            items_path="$.DescribeWorkspacesResult.Workspaces",
            output_path=sfn.JsonPath.DISCARD,
        )
        rebuild_each_workspace.iterator(sfn.Pass(self, "Map State"))

        definition = describe_workspaces.next(rebuild_each_workspace).next(
            sfn.Choice(self, "ChoiceMoreWorkspaces")
            .when(
                sfn.Condition.is_present("$.DescribeWorkspacesResult.NextToken"),
                describe_more_workspaces.next(rebuild_each_workspace),
            )
            .otherwise(sfn.Succeed(self, "Done"))
        )

        state_machine = sfn.StateMachine(
            self,
            id="WorkSpacesRebuilderStateMachine",
            state_machine_type=sfn.StateMachineType.STANDARD,
            definition=definition,
        )


Enter fullscreen mode Exit fullscreen mode

Thanks to Karsten Lang for fixing an error in a previous version

Top comments (2)

Collapse
 
karstenlang profile image
Karsten Lang

The example drops an error that leads to a Github issue which can be solved by adding the following line:

        rebuild_each_workspace.iterator(sfn.Pass(self, "Map State"))
Enter fullscreen mode Exit fullscreen mode

The finished cdk deploy-able example is available here ..

Collapse
 
jonasmellquist profile image
Jonas Mellquist

Thanks for this write up

Have you ever stumbled upon this error:
You specified an invalid value for nextToken. You must get the value from the response to a previous call to the API. (Service: Organizations, Status Code: 400, Request ID: 4e0f3bc2-7219-4869-848a-f2b2f6423523)

Related to docs.aws.amazon.com/organizations/...

Do you know the structure of NextToken? How to validate that a given NextToken is valid and so forth?