DEV Community

Roger Chi for AWS Community Builders

Posted on

Step Functions Distributed Map and Athena, a match made in cloud heaven

Step Functions released a new feature late last year called Distributed Map, which allows the service to coordinate parallel processing over huge datasets (millions of objects). Read more about it in the announcement blog here: https://aws.amazon.com/blogs/aws/step-functions-distributed-map-a-serverless-solution-for-large-scale-parallel-data-processing/

One of the input formats that can be used for the distributed map state is a .csv file in an S3 bucket. This opens up an opportunity to use the optimized Athena integration for Step Functions to generate the input .csv file that can be processed by the distributed map state.

Configuration

The distributed map state is configured to read a .csv file from S3 by using the ItemReader property:

  "ItemReader": {
    "Resource": "arn:aws:states:::s3:getObject",
    "ReaderConfig": {
      "InputType": "CSV",
      "CSVHeaderLocation": "FIRST_ROW"
    },
    "Parameters": {
      "Bucket.$": "$.CsvBucket",
      "Key.$": "$.CsvKey"
    }
  }
Enter fullscreen mode Exit fullscreen mode

The Athena StartQueryExecution task is configured to output CsvBucket and CsvKey to the state:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
  "Parameters": {
    "QueryString": "select * from mytable",
    "WorkGroup": "primary"
  },
  "ResultSelector": {
    "CsvBucket.$": "States.ArrayGetItem(States.StringSplit($.QueryExecution.ResultConfiguration.OutputLocation, ':/'), 1)",
    "CsvKey.$": "States.ArrayGetItem(States.StringSplit($.QueryExecution.ResultConfiguration.OutputLocation, ':/'), 2)"
  },
  "Next": "DistributedMapState"
}
Enter fullscreen mode Exit fullscreen mode

The ResultSelector configuration takes the S3 URI that's in the $.QueryExecution.ResultConfiguration.OutputLocation path and uses intrinsic functions to parse out the Bucket and Key of the CSV result file. Note, this assumes the .csv output is at the root level of the query results bucket.

Example

Here is a simple example of what's possible with this workflow:

State Machine with Distributed Map to copy image files from an Athena query

In this state machine, we have an Athena query that retrieves a list of S3 URIs containing image files that we want to copy to a separate location for further processing. We use the distributed map state to process these S3 URIs and use the SDK integration to call S3 CopyObject to copy the images to the other location.

Conclusion

Step Functions Distributed Map, along with Athena queries can enable orchestrated Serverless workflows that allow us to process huge amounts of data all through the Step Functions service.

About me

I am a Staff Engineer @ Veho. We are still actively hiring for some key roles! If you are passionate about serverless, or even looking to grow your skills, have a look at our open positions!

Top comments (0)