DEV Community

Cover image for ๐Ÿ“ฑ AWS CDK 101 - ๐Ÿ‘ฏ Fetching JSON from S3 through stepfunction
Aravind V
Aravind V

Posted on • Updated on • Originally published at devpost.hashnode.dev

๐Ÿ“ฑ AWS CDK 101 - ๐Ÿ‘ฏ Fetching JSON from S3 through stepfunction

๐Ÿ”ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.

If in case missed my previous article, do find it with the below links.

๐Ÿ” Original previous post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted previous post at ๐Ÿ”— dev to @aravindvcyber

In this article, let us refactor one of our previous step functions which invoked lambda which puts records to dynamo using full message from the event data, into one which can read message data from s3 with keys during the event data. Also, this is a direct follow-up to my previous article mentioned above.

Benefits of using S3 data โŒ›

These are not the only benefits, it is just only what I just observed in my deployments.

  • Using S3 data directly will make the event detail payload always of the same payload size and very light

  • While a huge message is pumped into the api gateway S3 will act as the place to capture the full message payload and only the s3 object identifier is pushed into the event detail so that we are not pushing a lot of into the actual moving parts.

  • This also helps to simply preprocess the actual message received by checking for the format, data integrity, transformation, and even antivirus scanning.

  • S3 in itself can trigger various event-driven actions asynchronously by its event notifications.

  • Pumping the event details into the step functions will be now much more efficient as we are no longer worried about the size of the data.

  • Also here we have using S3 as staging, if we need high performance and high throughput we can change it to dynamodb for storing this staging data.

Construction Plan ๐Ÿ—๏ธ

As we mentioned earlier we are trying to read a JSON from s3, from our message recorder lambda function which we have refined in our previous articles for writing to dynamodb.

import { S3 } from "aws-sdk";

const s3 = new S3();
Enter fullscreen mode Exit fullscreen mode

Refactoring put operation inside the lambda into a helper function ๐ŸŽŠ

We have refactored the previously used message recorder function for better reuse of the components as shown below.


const dbPut: any = async (Record: any, msg: any) => {
  const dynamo = new DynamoDB();
  const crt_time: number = new Date(msg.createdAt).getTime();
  const putData: PutItemInput = {
    TableName: process.env.MESSAGES_TABLE_NAME || "",
    Item: {
      messageId: { S: msg.messageId },
      createdAt: { N: `${crt_time}` },
      event: { S: msg.event },
    },
    ReturnConsumedCapacity: "TOTAL",
  };

  console.log("putData", JSON.stringify(putData, undefined, 2));

  await dynamo.putItem(putData).promise();
};

Enter fullscreen mode Exit fullscreen mode

Refactoring the sendSuccess Helper ๐Ÿ”ฎ

We will also make use of this opportunity to refactor the existing SendTaskSuccess statement into a dedicated function that we could efficiently.


const funcSuccess: any = (res: any, mid: string, token: string) => {
  console.log("sending success ", { res });
  const sendSuccess: StepFunctions.SendTaskSuccessInput = {
    output: JSON.stringify({
      statusCode: 200,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: mid,
        ProcessorResult: res,
      },
    }),
    taskToken: token,
  };
  const resultStatus = sfn.sendTaskSuccess(
    sendSuccess,
    (err: any, data: any) => {
      if (err) console.log(err, err.stack);
      else console.log(data);
    }
  );
  console.log("sent success: ", { resultStatus, sendSuccess });
};
Enter fullscreen mode Exit fullscreen mode

Refractoring the sendFailure Helper ๐Ÿ„

Similarly, we will also refactor and create sendTaskFailure module into a new helper function.

const funcFailure: any = (err: any, mid: string, token: string) => {
  console.log("sending failure ", { err });
  const sendFailure: StepFunctions.SendTaskFailureInput = {
    error: JSON.stringify(err),
    cause: JSON.stringify({
      statusCode: 500,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: mid,
        ProcessorResult: err,
      },
    }),
    taskToken: token,
  };
  const resultStatus = sfn.sendTaskFailure(
    sendFailure,
    (err: any, data: any) => {
      if (err) console.log(err, err.stack);
      else console.log(data);
    }
  );
  console.log("sent failure: ", { resultStatus, sendFailure });
};
Enter fullscreen mode Exit fullscreen mode

Granting read Object access to lambda ๐Ÿƒ

In our previous article we have granted access to the entry handler to putObject, here we will grant access to the recorder function to read data as follows.


stgMsgBucket.grantWrite(eventCounterBus.handler);
stgMsgBucket.grantRead(messageRecorder);

Enter fullscreen mode Exit fullscreen mode

S3 read access

Changes inside the lambda ๐ŸŒ

The below changes will be used inside the lambda handler to read every message using the event.detail
content which now has the bucket name and the key from the event.

Here can find that the bucket name and object key is extracted from the message and used to retrieve an object from S3, then it is put into dynamodb for recording using the various helper functions created above.

await Promise.all(
    event.Records.map(async (Record: any) => {
      console.log("Received message:", JSON.stringify(event, undefined, 2));
      const msg = JSON.parse(Record.body).Record;
      const s3Get = await s3
        .getObject({
          Bucket: msg.bucket,
          Key: msg.key,
        })
        .promise();
      const data = s3Get.Body?.toString("utf-8");
      if (data) {
        msg.event = data;
        const token = JSON.parse(Record.body).MyTaskToken;
        await dbPut(Record, msg)
          .then(async (data: any) => {
            await funcSuccess(data, msg.messageId, token);
          })
          .catch(async (err: any) => {
            await funcFailure(err, msg.messageId, token);
          });
      }
    })
  );
Enter fullscreen mode Exit fullscreen mode

Once we get the message, we are updating the message object with the message content and inserting it into dynamodb with the dbPut we have refracted earlier.

Removing the event content from statemachine invocation payload ๐Ÿญ

Now we can remove the usage of the actual message body across the entire pipeline, which makes it reduce the storage used in transmission call during various invocations call.

const sfnTaskPayload = sfn.TaskInput.fromObject({
      MyTaskToken: sfn.JsonPath.taskToken,
      Record: {
        "messageId.$": "$.id",
        "createdAt.$": "$.time",
        // "event.$": "States.StringToJson($.detail.message)",
       // "event.$": "$.detail.message",
        "bucket.$": "$.detail.message.bucket",
        "key.$": "$.detail.message.key"
      },
    });

Enter fullscreen mode Exit fullscreen mode

Also in the entry handler function discussed in the last article as shown below

   const message = JSON.parse(event.body);
  message.uuid = getUuid();
  message.handler = context.awsRequestId;
  message.key = `uploads/${message.uuid}.json`;
  message.bucket = process.env.BucketName || "";
  console.log("Initial request:", JSON.stringify(message, undefined, 2));
  delete message.message;  //new line added, since s3 will have the data
Enter fullscreen mode Exit fullscreen mode

Integration testing ๐Ÿ

New message

sfn instance

s3 data

Final destination

Keypoint async-await ๐Ÿ””

One important thing you may have to learn here would be how we have handled the async-await, to fetch from S3 first and then write to dynamodb, among other async operations.

Beginners most likely may go wrong in using async-await and they may mesh up by getting execution leaks when they implemented callbacks poorly.

By then the trace below can help you understand where the problem is present.

trace

We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

โญ We have our next article in serverless, do check out

https://dev.to/aravindvcyber/aws-cdk-101-fetching-json-from-dynamodb-vs-s3-through-stepfunction-1mb7

๐ŸŽ‰ Thanks for supporting! ๐Ÿ™

Would be great if you like to โ˜• Buy Me a Coffee, to help boost my efforts.

Buy Me a Coffee at ko-fi.com

๐Ÿ” Original post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted at ๐Ÿ”— dev to @aravindvcyber


๐Ÿš AWS CDK 101 - ๐Ÿฌ Fetching JSON from dynamodb vs S3 through stepfunction @hashnode

Checkout the full collectionhttps://t.co/CuYxnKr0Ig

%[https://t.co/wBjjaPapaN]#TheHashnodeWriteathon#serverless #awscdk #dynamodb #amazons3 #typescript

โ€” Aravind V (@Aravind_V7) May 8, 2022

Top comments (4)

Collapse
 
mmuller88 profile image
Martin Muller

Super cool article. Unlucky the "buy me a coffee" doesn't allow me to give you a coffee. I made a screenshot from the error. Write me on Twitter if you want to see the error. @MartinMueller_

Collapse
 
aravindvcyber profile image
Aravind V

@mmuller88 Seems I am unable to message you, I did follow you from @Aravind_V7.
Could you message me with the screenshot using DM.

Collapse
 
mmuller88 profile image
Martin Muller

Which DM you mean? I can't see a DM feature here in Dev.to ?

Thread Thread
 
mmuller88 profile image
Martin Muller

Wrote you on Twitter