DEV Community

Cover image for πŸ‡ AWS CDK 101 - 🍭 StateMachine and StepFunctions replacing our SQS based lambda trigger
Aravind V
Aravind V

Posted on • Updated on • Originally published at devpost.hashnode.dev

πŸ‡ AWS CDK 101 - 🍭 StateMachine and StepFunctions replacing our SQS based lambda trigger

πŸ”° Beginners new to AWS CDK, please do look at my previous articles one by one in this series.

If in case missed my previous article, do find it with the below links.

πŸ” Original previous post at πŸ”— Dev Post

πŸ” Reposted previous post at πŸ”— dev to @aravindvcyber

In this article, let us refactor our previous event rule which targets messages to a queue that triggers lambda directly into a new rule which will invoke a state machine, which will, in turn, invoke our lambda as a step function.

Benefits achieved in this approach πŸš£β€β™€οΈ

There are multiple benefits associated with this approach as follows.

  • AWS Step Functions lets you coordinate multiple AWS services into serverless workflows so you can build and update apps quickly

  • We can detach the direct invocation of our lambda by SQS into an indirect invocation via a state machine.

  • Using state machine, we could do a lot of transformation and conditional checks while we also enjoy the ability to creatively do a lot of orchestration by adding several step functions which could be identified as distinct chunks of steps in our workflow.

  • Thus eventually refracturing away most of the business flow logic from within the lambda into the statemachine definition and thereby making our lambda/processors more generalized and could be shared among various other tasks as well.

  • Also statemachine provides a lot of metrics, logs, and visual reference to the actual point of failure, which we may find a bit hard to trace and find inside the traditional monolithic lambda.

New construct for state machine 🚧

Let us start by creating a new file constructs/sfn-simple.ts

We will start by importing the common modules along with stepfunction and stepfunctions_tasks as follows.

import * as lambda from "aws-cdk-lib/aws-lambda";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Construct } from "constructs";
Enter fullscreen mode Exit fullscreen mode

We do need other minor imports for this construct, but those are already discussed in our other articles and you should be able to understand them implicitly.

Let us add a new props interface sfnProps to the required input information from the stack where this construct has been implemented.


export interface sfnProps {
  triggerFunction: lambda.Function;
  timeout: Duration;
}

Enter fullscreen mode Exit fullscreen mode

In the above block of code, triggerFunction will be our backend lambda which we would write later here when we are done with the statemachine definition. Timeout will be to limit the maximum total time taken for statemachine invocation.

Construct skeleton πŸƒ

Let us create a model construct template as shown below.


export class simpleSfnConstruct extends Construct {

  public readonly sfnMachine: sfn.StateMachine

  constructor(scope: Construct, id: string, props: sfnProps) {
    super(scope, id);
  }
}

Enter fullscreen mode Exit fullscreen mode

You could also find the read-only object sfnMachine, which we will use to refer from the stack to the statemachine created from inside the constructor function definition.

const { triggerFunction,timeout } = props;
Enter fullscreen mode Exit fullscreen mode

Usual destructuring of our props inside the constructor.

Lambda payload with taskToken πŸ”‘


const sfnTaskPayload = sfn.TaskInput.fromObject({
      "MyTaskToken": sfn.JsonPath.taskToken,
      "Record": {
        "messageId.$": "$.id",
        "createdAt.$": "$.time",
        "event.$": "States.StringToJson($.detail.message)"
      }
});
Enter fullscreen mode Exit fullscreen mode

Here sfnTaskPayload will define our payload which we would use to pass as a parameter inside our stepfunction which will be used to invoke the lambda and wait for its completion.

Important things to note here will be the below properties.

  • MyTaskToken which will get the sfn.JsonPath.taskToken from the context data, and then the stepfunction will pause and wait. Later we will get the result from the lambda via SendTaskSuccess and SendTaskFailure which help us to generate the output without polling the lambda, again and again, to check for the status and thereby saving us some state transitions using this MyTaskToken as reference.

  • In the Record section, you could see that we have offloaded a certain part of the message level data extraction from the event message data from the processor code base to the stepfunction itself compared to our previous article. This involves using the JSON path syntax. Such kind of transformation and data extraction before actually invoking the compute help us with fine grain granularity, and control in our workflow logic and will be much useful for the statemachine designer and maintainer.

const recordMsg = new tasks.LambdaInvoke(this, "Record Message", {
      lambdaFunction: triggerFunction,
      timeout: Duration.minutes(1),
      comment: "Record message in dynamo",
      integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
      inputPath: "$",
      payload: sfnTaskPayload,
      resultSelector: {
        "Payload.$": "$",
        "StatusCode.$": "$.statusCode"
      },
      resultPath: "$.recordResult",
});
Enter fullscreen mode Exit fullscreen mode

lambda connected with sfn

Final status steps πŸ“²

This is only a formal success and failure point of reference, which we create in our statemachine to better visualize what has happened during the workflow execution.

const jobFailed = new sfn.Fail(this, "Job Failed", {
      comment: "Job Failed"
});
const jobSucceed = new sfn.Succeed(this, "Job Succeed", {
      comment: "Job Succeed"
});

Enter fullscreen mode Exit fullscreen mode

Choice step for branching the workflow ✈️

The choice step is used as a visual and functional reference to help make a decision based on the output from the lambda invocation job status from the previous step. Here we do the decision-making by choosing the next step using the data from the previous step's output.

const checkStatus = new sfn.Choice(this, "Check Status?",{
      inputPath: "$.recordResult"
    })
    .when(sfn.Condition.numberEquals("$.StatusCode", 500), jobFailed)
    .when(sfn.Condition.numberEquals("$.StatusCode", 200), jobSucceed)
    .otherwise(jobFailed);
Enter fullscreen mode Exit fullscreen mode

Stepfunction chaining into statemachine definition πŸ”—

You can see from the below code that statefunction is nothing but a chain of stepfunctions, which we defined earlier. And every stepfunction is connected to the next one as a simple chain, though it could even contain some branching steps like the choice function.

const sfnDef = recordMsg.next(checkStatus);
Enter fullscreen mode Exit fullscreen mode

state machine json

Statemachine log group 🌼

A new log group is created to contain the logs received from the statemachine execution as shown below. This will be used in the statemachine implementation part.


const sfnLog = new LogGroup(this, "sfnLog", {
      logGroupName: "sfnLogGroup",
      removalPolicy: RemovalPolicy.DESTROY,
      retention: RetentionDays.ONE_WEEK
})

Enter fullscreen mode Exit fullscreen mode

statemachine logging

Statemachine specification 🐩

With that now, we can define the statemachine properties as shown below. Here it includes the sfnDef and sfnLog which is created earlier.

const stateMachine = new sfn.StateMachine(this, "msgStateMachine", {
      definition: sfnDef,
      timeout: timeout,
      logs: {
      destination: sfnLog,
      includeExecutionData: true,
      level: LogLevel.ALL
      }
});

Enter fullscreen mode Exit fullscreen mode

statemachine workflow

Granting invoke lambda to statemachine 🌹

Since lambda is a resource, we have to explicitly grant privilege to the statemachine execution role utilizing adding a new IAM policy statement sfnLambdaInvokePolicy shown below. This will help in the Record Message step in the workflow shown above.

const sfnLambdaInvokePolicy = new Policy(this, 'sfnLambdaInvokePolicy');
    sfnLambdaInvokePolicy.addStatements(
      new PolicyStatement({
        actions:[
          "lambda:InvokeFunction"
      ],
        effect: Effect.ALLOW,
        resources: [`${triggerFunction.functionArn}:$LATEST`],
        sid: "sfnLambdaInvokePolicy"
      })
    )
stateMachine.role.attachInlinePolicy(sfnLambdaInvokePolicy)
Enter fullscreen mode Exit fullscreen mode

sfn exec role inline policy

Granting lambda execution role for sending status update 🍁

Since we are not going to poll the lambda to find the status, again and again, we expect the lambda to callback the statemachine on the job completion results. We have already sent the token part of the payload to the lambda, which will then post a message back to the statemachine giving the status as success or failure based on the scenario. Till then the statemachine will be paused in its current step.
Please find the privileges in the form of IAM policy statement which is assigned to the processor lambda execution role to help achieve this.

const lambdaSfnStatusUpdatePolicy = new Policy(this, 'lambdaSfnStatusUpdatePolicy');
    lambdaSfnStatusUpdatePolicy.addStatements(
      new PolicyStatement({
        actions:[
          "states:SendTaskSuccess",
          "states:SendTaskFailure",
      ],
        effect: Effect.ALLOW,
        resources: ['*'],
        sid: "lambdaSfnStatusUpdatePolicy"
      })
    )
triggerFunction.role?.attachInlinePolicy(lambdaSfnStatusUpdatePolicy)
Enter fullscreen mode Exit fullscreen mode

lambda role policy

Setting the sfnMachine readonly property 🐲

This is required to access the statement object from the stack where it is implemented and use it for further integration.

stateMachine.applyRemovalPolicy(RemovalPolicy.DESTROY);

this.sfnMachine = stateMachine
Enter fullscreen mode Exit fullscreen mode

New lambda to record the message into the dynamodb 🌷

Let us create a new file under lambda/message-recorder.ts. In this lambda, we are only going to implement the save to dynamodb on the specified table. Besides logic to send status callback for success or failure scenarios, with the right output message.

import { PutItemInput } from "aws-sdk/clients/dynamodb";

import { DynamoDB,StepFunctions } from "aws-sdk";

const sfn = new StepFunctions({ apiVersion: "2016-11-23" });

exports.processor = async function (event: any) {
  const dynamo = new DynamoDB();
  let result: any | undefined = undefined;
  const msg = event.Record;
  const crt_time: number = new Date(msg.createdAt).getTime();
  const putData: PutItemInput = {
    TableName: process.env.MESSAGES_TABLE_NAME || "",
    Item: {
      messageId: { S: msg.messageId },
      createdAt: { N: `${crt_time}` },
      event: { S: JSON.stringify(msg.event) },
    },
    ReturnConsumedCapacity: "TOTAL",
  };
  try {
    result = await dynamo.putItem(putData).promise();
  } catch (err) {
    const sendFailure: StepFunctions.SendTaskFailureInput = {
      error: JSON.stringify(err),
      cause: JSON.stringify({
        statusCode: 500,
        headers: { "Content-Type": "text/json" },
        putStatus: {
          messageId: msg.messageId, 
          ProcessorResult: err,
        },
      }),
      taskToken: event.MyTaskToken,
    };

    await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
      if (err) console.log(err, err.stack); 
      else console.log(data); 
    });
    return sendFailure;
  }

  const sendSuccess: StepFunctions.SendTaskSuccessInput = {
    output: JSON.stringify({
      statusCode: 200,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: msg.messageId,
        ProcessorResult: result,
      },
    }),
    taskToken: event.MyTaskToken,
  };
  await sfn
    .sendTaskSuccess(sendSuccess, function (err: any, data: any) {
      if (err) console.log(err, err.stack); 
      else console.log(data); 
    })
    .promise();
  return sendSuccess;
};

Enter fullscreen mode Exit fullscreen mode

Defining the lambda inside the stack πŸƒ

Here we will be using the above code asset to define the lambda resource inside our CDK stack.


const messageRecorder = new lambda.Function(this, "MessageRecorderHandler", {
      runtime: lambda.Runtime.NODEJS_14_X,
      code: lambda.Code.fromAsset("lambda"),
      handler: "message-recorder.processor",
      logRetention: logs.RetentionDays.ONE_MONTH,
      environment: {
        MESSAGES_TABLE_NAME: envParams.messages.tableName || "",
      },
    });

messageRecorder.applyRemovalPolicy(RemovalPolicy.DESTROY);
Enter fullscreen mode Exit fullscreen mode

Implementing the new sfn construct inside our stack 🌴

Importing the construct library created earlier.

import {simpleSfnConstruct} from "../constructs/sfn-simple"
Enter fullscreen mode Exit fullscreen mode

Passing the required params and getting an instance object reference by initialing the construct.

const sfnMachine = new simpleSfnConstruct(this, 'sfnMachine', {
      timeout: Duration.seconds(30),
      triggerFunction: messageRecorder 
})
Enter fullscreen mode Exit fullscreen mode

statemachine list

statemachine iam role

Event Target to statemachine πŸ“’

const sfnRole = new Role(this, 'Role', {
      assumedBy: new ServicePrincipal('events.amazonaws.com'),
});
const sfnCommonEventTarget = new eventTargets.SfnStateMachine(sfnMachine.sfnMachine,{
      deadLetterQueue: commonEventProcessorQueueDLQ.queue,
      retryAttempts: 3,
      input: RuleTargetInput.fromEventPath("$"),
      role: sfnRole
})
Enter fullscreen mode Exit fullscreen mode

event rule targets

New event rule for the event target πŸ”©

In this event rule, we use the new bus commonbus and we use the same eventPattern to forward the events to the statemachine defined above.

const sfnEventRule = new Rule(this, `sfnCommonEventProcessorRule`, {
      eventBus: commonBus,
      eventPattern: { source: [`com.devpost.commonevent`] },
      targets: [sfnCommonEventTarget],
      ruleName: "sfnCommonEventProcessorRule",
      enabled: true
});
sfnEventRule.applyRemovalPolicy(RemovalPolicy.DESTROY);
Enter fullscreen mode Exit fullscreen mode

new event rule

Testing with postman 🎿

Here I will be performing a test, by sending a message to the API endpoint as shown below once I have deployed the solution to my AWS environment.

postman

Querying with the messageId inside the dynamodb, the table is as follows

SELECT * FROM "MessagesTable"  where messageId = 'cdd51245-987a-b3c7-eecf-6d6d63046073'
Enter fullscreen mode Exit fullscreen mode

We can find the message in the dynamodb now.

dynamodb query

Inspecting from AWS console 🎳

You could now check the workflow progress and execution logs from the AWS console.

statemachine executions

event history steps

sfn execution

Find with the event messageId πŸ€

logging

statemachine success

Thus we have defined a new statemachine and reconfigured our existing event bus role to the new rule which delivers messages to the statemachine that we have built-in this article.

We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

⏭ We have our next article in serverless, do check out

πŸŽ‰ Thanks for supporting! πŸ™

Would be great if you like to β˜• Buy Me a Coffee, to help boost my efforts.

Buy Me a Coffee at ko-fi.com

πŸ” Original post at πŸ”— Dev Post

πŸ” Reposted at πŸ”— dev to @aravindvcyber


πŸ‡ AWS CDK 101 - 🍭 StateMachine and StepFunctions replacing our SQS based lambda trigger

And much more in my pagehttps://t.co/CuYxnKr0Ig#awscdk #dynamodb #serverless #typescript #awslambda https://t.co/086UGXD0OV

β€” Aravind V (@Aravind_V7) April 16, 2022

Discussion (2)

Collapse
mmuller88 profile image
Martin Muller

Some formatting issues here

Collapse
aravindvcyber profile image
Aravind V Author

taken care now, thanks for your note @mmuller88