DEV Community

loading...

AWS Lambda SQS events with Serverless Framework

piczmar_0 profile image Marcin Piczkowski ・7 min read

drawn with [cloudcraft.co](http://cloudcraft.co)

This post was initially published at Cloudly.Tech


On 28 JUN 2018 Amazon announced adding support for SQS events in Lambda.
This greatly simplifies development of serverless message-driven systems.
Previously, if we wanted to listen on SQS events in Lambda, we would need to poll for messages on some time intervals.
Lambda would have to be triggered by cron scheduler to check if any new messages appeared in SQS queue.
If there was nothing new then it was a waste of resources and money of course.

Now we do not need scheduler anymore and Lambda function will be automatically invoked when new message appears in SQS queue.

In this post I will show how we can use this new feature in Serverless Framework.

If you have not heard of this framework yet then you can have a look at very good documentation here.


SIDE NOTE:
At the time of writing this post the SQS events are not yet supported by released Serverless Framework ver. 1.27.3 but there is a PR ready so this should be added to the next release.
If you are impatient like me, you can pull the branch from GitHub and build it yourself:

npm install
Enter fullscreen mode Exit fullscreen mode

Then you can use bin/serverless script to create and deploy project.


I can imagine many use-cases for message-driven architecture, e.g. in order processing system a message could be sent to another service after an order was placed by a customer . Customer would be immediately notified about order submitted, while the whole processing of the order could happen asynchronously (in message-driven manner).

I tried to keep this example as simple as possible and not to dive deep into any specific business scenario.

We will have only two functions:

  • sender - will be triggered by REST API and submit a new message to SQS queue
  • receiver - will process messages from SQS queue.

Let's create a new project from aws-nodejs template:

serverless create --template aws-nodejs
Enter fullscreen mode Exit fullscreen mode

This would generate two files:

  • serverless.yml - a defintion of new service stack on AWS (using CloudFromation underneath)
  • handler.js - a demo function

We can rename handler.js to receiver.js and slightly update it.

After updating the generated sources for your demo they would looks as follows:

serverless.yml

service: sqs-triggers-demo

provider:
  name: aws
  runtime: nodejs6.10
  profile: sls
  region: us-east-1
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "sqs:SendMessage"
        - "sqs:GetQueueUrl"
      Resource: "arn:aws:sqs:${self:provider.region}:811338114632:MyQueue"
    - Effect: "Allow"
      Action:
        - "sqs:ListQueues"
      Resource: "arn:aws:sqs:${self:provider.region}:811338114632:*"


functions:
  sender:
    handler: sender.handler
    events:
      - http:
          path: v1/sender
          method: post
  receiver:
    handler: receiver.handler
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - MyQueue
              - Arn

resources:
  Resources:
    MyQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "MyQueue"
Enter fullscreen mode Exit fullscreen mode

Let me explain you what is happening here.
First, we defined our service name sqs-triggers-demo and specified in which region we want to create it (us-east-1).
Next, we need to give access to send messages to SQS queue, which will be used by sender function.

  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "sqs:SendMessage"
        - "sqs:GetQueueUrl"
      Resource: "arn:aws:sqs:${self:provider.region}:811338114632:MyQueue"
    - Effect: "Allow"
      Action:
        - "sqs:ListQueues"
      Resource: "arn:aws:sqs:${self:provider.region}:811338114632:*"
Enter fullscreen mode Exit fullscreen mode

Queue name MyQueue is hard-coded here. In production code we would probably like to pass it from environment properties.
I have also hard-coded an account ID for simplicity, but we should also pass it as a property. However, syntax in this case is a bit ugly and would distract readers from the main subject. You can have a look here how to parametrize it.

Then we specify two functions: sender and receiver and handlers for them which will be coded in files sender.js and receiver.js.
Each of them will have one function named handler.

functions:
  sender:
    handler: sender.handler
    events:
      - http:
          path: v1/sender
          method: post
  receiver:
    handler: receiver.handler
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - MyQueue
              - Arn
Enter fullscreen mode Exit fullscreen mode

The receiver function has an SQS event defined. It will be triggered from MyQueue.
It needs to have an arn of the queue defined. In this case it's specified by logical ID, but it could be done also this way:

  receiver:
    handler: receiver.handler
    events:
      - sqs:
          arn: "arn:aws:sqs:${self:provider.region}:811338114632:MyQueue"
          batchSize: 1
Enter fullscreen mode Exit fullscreen mode

Optionally we can define a batch size, which is how many SQS messages at once the Lambda function should process (default and max. value is 10).

The sqs event will hook up your existing SQS Queue to a Lambda function. Serverless won't create a new queue. This is why we have a resources section at the end which will create a new queue for us.

resources:
  Resources:
    MyQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "MyQueue"
Enter fullscreen mode Exit fullscreen mode

sender.js

var AWS = require('aws-sdk');
var sqs = new AWS.SQS({
    region: 'us-east-1'
});

exports.handler = function(event, context, callback) {
    var accountId = context.invokedFunctionArn.split(":")[4];
    var queueUrl = 'https://sqs.us-east-1.amazonaws.com/' + accountId + '/MyQueue';

    // response and status of HTTP endpoint
    var responseBody = {
        message: ''
    };
    var responseCode = 200;

    // SQS message parameters
    var params = {
        MessageBody: event.body,
        QueueUrl: queueUrl
    };

    sqs.sendMessage(params, function(err, data) {
        if (err) {
            console.log('error:', "failed to send message" + err);
            var responseCode = 500;
        } else {
            console.log('data:', data.MessageId);
            responseBody.message = 'Sent to ' + queueUrl;
            responseBody.messageId = data.MessageId;
        }
        var response = {
            statusCode: responseCode,
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(responseBody)
        };

        callback(null, response);
    });
}
Enter fullscreen mode Exit fullscreen mode

At the top of the file we import AWS SDK API used to send to SQS.
As we want to use this function from API Gateway we need to return compliant response which is of this form:

{
    statusCode: <HTTP status code>,
    headers: {
        <a map of HTTP headers>
    },
    body: <response body as string>
};

Enter fullscreen mode Exit fullscreen mode

What we are sending to SQS is a HTTP request body.

receiver.js

exports.handler = (event, context, callback) => {
    const response = {
        statusCode: 200,
        body: JSON.stringify({
            message: 'SQS event processed.',
            input: event,
        }),
    };

    console.log('event: ', JSON.stringify(event));

    var body = event.Records[0].body;
    console.log("text: ", JSON.parse(body).text);

    callback(null, response);
};

Enter fullscreen mode Exit fullscreen mode

Here we're not doing more than parsing an event received from SQS and extracting HTTP body passed from sender function:

var body = event.Records[0].body;
Enter fullscreen mode Exit fullscreen mode

The event contains an array Records which is an array of SQS messages. The size depends on the batch size specified for SQS event on Lambda function (batchSize in serverless.yml) and the number of events in the queue.

Then I am expecting the HTTP body to have text parameter and print it out:

console.log("text: ", JSON.parse(body).text);
Enter fullscreen mode Exit fullscreen mode

Surely, it will fail if there is no such parameter, but we will test it with the right data :)

Now, we can deploy the application and print info to get the endpoint URL:

serverless deploy
serverless info
Enter fullscreen mode Exit fullscreen mode

It would print something like:

Service Information
service: sqs-triggers-demo
stage: dev
region: us-east-1
stack: sqs-triggers-demo-dev
api keys:
  None
endpoints:
  POST - https://fochwxb6gh.execute-api.us-east-1.amazonaws.com/dev/v1/sender
functions:
  sender: sqs-triggers-demo-dev-sender
  receiver: sqs-triggers-demo-dev-receiver
Enter fullscreen mode Exit fullscreen mode

Then tail logs with command:

serverless logs -f receiver
Enter fullscreen mode Exit fullscreen mode

In the other console send HTTP request to sender endpoint like that:

curl -d '{"text" : "Hello world!"}' https://fochwxb6gh.execute-api.us-east-1.amazonaws.com/dev/v1/sender
Enter fullscreen mode Exit fullscreen mode

The logs from receiver would look similar to these:

START RequestId: 384a3c5c-5689-59ba-a04a-6fef0588bae9 Version: $LATEST
2018-07-02 00:08:21.571 (+02:00)    384a3c5c-5689-59ba-a04a-6fef0588bae9    event:  {"Records":[{"messageId":"df38c9a3-6562-4dd4-b978-98ac4dce42d0","receiptHandle":"AQEBF/CZlWiLwMpUr8GqE4a2ns7nIBv1nAlD/MJw4HCxYlyzNoSPJxjc+J7Rn34IdKq8wAH+I1e8eUW6ZuVEAat6pD4hZ2WYO+oCgnVFTQMF59zTkT7Miw3F36pR8SxywXAnKUtmCdpeVv40Zz+KYjwlP6VooSIrtCFRuLFFocfjKhBVdKb/9B7Rs5ZgB+QOYgB1bBEfaLfzWNQdFk1bBnDx9BSwQ+9mTMe2wgDIIwUpbSy8NxFZlMW2PzqUHm5JU7wz+dN7bKGVvfnh/URlXS+JNfs2IlXkpZIb+4kU5IG3ItBtQo3y1SzmsXlqczKShI9pgEmL4xtuC94tdNbX0+EgTz2NyX2vN8k4/2aafgw5JIefuiriqCmteesqFvT+awPV","body":"{\"text\" : \"Hello world!\"}","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1530482900766","SenderId":"AROAINV2MGHNVAM5STGJY:sqs-triggers-demo-dev-sender","ApproximateFirstReceiveTimestamp":"1530482900780"},"messageAttributes":{},"md5OfBody":"63c34d70a5a1b50dba746843a0cefd5b","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-1:811338114632:MyQueue","awsRegion":"us-east-1"}]}
2018-07-02 00:08:21.571 (+02:00)    384a3c5c-5689-59ba-a04a-6fef0588bae9    text:  Hello world!
END RequestId: 384a3c5c-5689-59ba-a04a-6fef0588bae9
REPORT RequestId: 384a3c5c-5689-59ba-a04a-6fef0588bae9  Duration: 0.56 ms   Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 20 MB  

Enter fullscreen mode Exit fullscreen mode

You may wonder what's the case with Lambda scaling when we fload SQS with messages.
Amazon documentation explains it:


For Lambda functions that process Amazon SQS queues, AWS Lambda will automatically scale the polling on the queue until the maximum concurrency level is reached, where each message batch can be considered a single concurrent unit. AWS Lambda's automatic scaling behavior is designed to keep polling costs low when a queue is empty while simultaneously enabling you to achieve high throughput when the queue is being used heavily.
Here is how it works:

  • When an Amazon SQS event source mapping is initially enabled, or when messages first appear after a period without traffic, Lambda begins polling the Amazon SQS queue using five clients, each making long poll requests in parallel.
  • Lambda monitors the number of inflight messages, and when it detects that this number is increasing, it will increase the polling frequency by 20 ReceiveMessage requests per minute and the function concurrency by 60 calls per minute. As long as the queue remains busy, scale up continues until at least one of the following occurs:
  • Polling frequency reaches 100 simultaneous ReceiveMessage requests and function invocation concurrency reaches 1,000. The account concurrency maximum has been reached.
  • The per-function concurrency limit of the function attached to the SQS queue (if any) has been reached.

Note

Account-level limits are impacted by other functions in the account, and per-function concurrency applies to all events sent to a function. For more information, see Managing Concurrency.
When AWS Lambda detects that the number of inflight messages is decreasing, it will decrease the polling frequency by 10 ReceiveMessage requests per minute and decrease the concurrency used to invoke your function by 30 calls per minute.


Fortunately, there is a way to limit concurrency per lambda function with reservedConcurrency property.
E.g. if we wanted to allow only one thread per receiver function, the function definition would looks like that :

  receiver:
    handler: receiver.handler
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - MyQueue
              - Arn
    reservedConcurrency: 1
Enter fullscreen mode Exit fullscreen mode

That's it. You can find source files for this demo at GitHub.
Don't forget to remove the service stack from AWS after you finished playing with it:

serverless remove
Enter fullscreen mode Exit fullscreen mode

I'm glad if you got till the end of this post.
If you found it interesting don't forget to like this article and follow me to be notified about similar ones in future :)

Discussion (25)

Collapse
jswhisperer profile image
Greg, The JavaScript Whisperer

Hey I can't find much about reservedConcurrency when you say limit to 1 thread, do lambdas share threads with more concurrency? Also if you limit the concurrency are the extra executions queued up for a bit or just fail? like if I call 100 lambda executions and there is a reservedConcurrency of 1, will it run 1 at a time until all 100 are up?
Cheers
Greg

Collapse
piczmar_0 profile image
Marcin Piczkowski Author

Greg, please have a look at this doc section "Throttling Behavior"

In short: when you exceed the reservedConcurrency limit, let's say call 100 lambdas while you set the limit to 1, you will experience throttling and the whether lambda will retry your call or not depends on the source of the event, e.g. if you call Lambda from API Gateway you will get the error from your API straight without any retry (you need to handle retries yourself in API called if you wish to)

Collapse
kbariotis profile image
Kostas Bariotis

After we experience throtlling, what happens to the messages that cannot be processed? are they going back to the queue? Does SQS considers them failed?

Thread Thread
piczmar_0 profile image
Marcin Piczkowski Author • Edited

If your Lambda function is invoked asynchronously, e.g. from other function, AWS Lambda automatically retries the throttled event for up to six hours, with delays between retries.

In case of lambda invoked from sqs (poll-based) AWS Lambda will continue to retry until MessageRetentionPeriod set for the queue expires or if you have redrive policy set on the queue then until MaxReceiveCount after which the message goes to DLQ.

docs.aws.amazon.com/lambda/latest/...

Thread Thread
kbariotis profile image
Kostas Bariotis

Yep got you, problem is that you cannot have a normal sequential processing by a certain number, say by 1, and fill up the queue. In a similar case, I ended up setting the MaxReceiveCount to 1000 in order to achieve that. Which worked alright but brought other issues, e.g. the message must not fail cause it will just keep retrying for a long time.

Thread Thread
piczmar_0 profile image
Marcin Piczkowski Author

I'm not sure I get your problem. Would be nice if you could illustrate it with example project. By sequential you mean processing messages in order I suppose but for this you would need fifo queues or design the app differently, e.g. say you store the current processing state in db. When message2 arrives before message1 you check in db that message1 was not yet processed and you put message1 in another queue and have another lambda which will forward the message back to the original queue (then the receive count is 0 again), when message1 arrives next you process it and update db state so when next time you receive message2 you will know that you can process it already. You could also use the DLQ for that and the other lambda could pass messages from DLQ back to the original queue. Just an idea, don't know your particular case but maybe it works for you.

Thread Thread
kbariotis profile image
Kostas Bariotis

Ah no I wasn't talking about order but rather the regulation flow. So I want the messages to wait in the queue until the worker can process them but with SQS/Lambda the messages won't wait since they are experiencing throttling and will fail and will either be removed or go to the DLQ.

But I liked you technique on preserving order. I haven't tried it, maybe I will in the future.

Thread Thread
piczmar_0 profile image
Marcin Piczkowski Author

Hmm.. I see. So maybe you could try the same technique here. When throttled message goes to DLQ you have a lambda function running in some scheduled interval which you set by experimenting with throttle rate, and this lambda will forward the message from DLQ back to original queue from which it could possibly be processed again and hopefully not throttled again.

Thread Thread
kbariotis profile image
Kostas Bariotis

I just created this repo and remembered of our discussion. This is how I managed to create a sequential processing pipeline with AWS SQS and Lambda on a Serverless project.

Let me know what you think! Thank you.

github.com/kbariotis/sqs-lambda-co...

Thread Thread
piczmar_0 profile image
Marcin Piczkowski Author

Nice :) I like it.

I tried to find out what happens to the CloudWatch event which is throttled, if it is retried or dropped but I could not see in docs, but I suppose it is more a problem of AWS what to do with high number of cumulated CloudWatch events which are continuously retried, rather than your problem, since setting of concurrency limit on Lambda to 1 provides that no more than 1 instance of this function will execute at certain time.

Nice wrap up. Thanks.

Collapse
jsiesquen profile image
Juan Siesquen • Edited

Hi Marcin,

Have an error when I try make a POST request to URL del API Gateway:

This is the response:
502 Bad Gateway
{
"message": "Internal server error"
}

What happend?

I not found nothing in CloudWatch Logs about of error into the REST resource.

Regards,

Collapse
jsiesquen profile image
Juan Siesquen

Make a test on API Gateway POST verb and your log show me is:
Execution failed due to configuration error: Malformed Lambda proxy response
Method completed with status: 502

But have well formed my response in my sender....

var response = {
"statusCode": responseCode,
"headers": {
"Content-Type": "application/json"
},
"body": JSON.stringify(responseBody),
"isBase64Encoded": false
};

Read this...
aws.amazon.com/es/premiumsupport/k...

But nothing! wuu

Collapse
piczmar_0 profile image
Marcin Piczkowski Author

Can you put your whole handler code somewhere, e.g
on gist to check how the function looks like?

Thread Thread
jsiesquen profile image
Juan Siesquen

I just re-created the sender.js and it goes well. I think he wrote something wrong. The important thing is that I could check how the data exchange between lambda and SQS works. Thank you!

Only a tiny addition:
if (err) {
console.log('error:', "failed to send message" + err);
responseCode = 500;
responseBody = err;
...

I did not understand the concurrency part yet. If my Lambda has more than 2000 records to send to SQS, I will send them one by one to the queue. I think the lambda will take a long time, can I send batch queues? Each record is a message to the queue.

Maybe it illuminates us with an additional post about this?

Thank you so much Marcin!

Thread Thread
piczmar_0 profile image
Marcin Piczkowski Author

you are allowed to send at most 10 messages in single batch docs.aws.amazon.com/AWSSimpleQueue...

I would delegate this task to multiple lambdas if you need to send batches concurrently to speedup.

Collapse
emramos profile image
Emanuel Müller Ramos

Thanks for the article, very useful :)
I have a question regarding the IAM permissions. In the serverless.yml three permissions were granted:
- "sqs:SendMessage"
- "sqs:GetQueueUrl"
- "sqs:ListQueues"

Why sqs:GetQueueUrl and sqs:ListQueues are needed for this example? Are they a pre-requisite for being able to send the messages in sender.js?
Regarding sqs:SendMessage, what will be the scope of the permission? Will it only allow the lambdas declared within the serverless.yml file to be able to send messages, or will it allow any service within the AWS account and/or public network to access it?
Thx!
Manu

Collapse
piczmar_0 profile image
Marcin Piczkowski Author

Hi, the permissions are needed for sender to send message to queue.
The scope of permission is only for lambdas within this CloudFormation stack.

Collapse
majoraze profile image
Pedro Rodrigues

Do I have to delete the message after receiving and processing?

Like:

var params = {
QueueUrl: 'STRING_VALUE', /* required /
ReceiptHandle: 'STRING_VALUE' /
required */
};

sqs.deleteMessage(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});

Collapse
piczmar_0 profile image
Marcin Piczkowski Author

Hi Pedro, I was confused at the beginning too, because this was the way to handle messages before SQS events were supported in Lambda when you had to poll the queue yourself.

It is not needed anymore and explained in AWS documentation (docs.aws.amazon.com/lambda/latest/...)

Lambda takes care of:
 - Automatically retrieving messages and directing them to the target Lambda function.
 - Deleting them once your Lambda function successfully completes.

What is important is the "successfully completes", because if the Lambda fails the message will remain in the queue and will be processed again, so you need to handle exceptions carefully.

You can e.g. surround lambda in try-catch clause and do not let Lambda fail because of wrong message or configure dead-letter queues where failed messages will be redirected automatically after Lambda fails. Then you can have e.g. separate lambda for dead-letter queues to handle such messages.

I'm planning to write more about Lambda error handling soon.

Collapse
mihailoff profile image
George Mihailov

Connect AWS Api Gateway to SQS pretty much by using CloudFormation

cloudhut.io/connect-aws-api-gatewa...

Collapse
piczmar_0 profile image
Marcin Piczkowski Author

Cool, looks like Serverless Framework does not support yet triggering SQS from Api Gateway, so CloudFormation is the only way, but it is work in progress: forum.serverless.com/t/api-gateway...

Collapse
jsiesquen profile image
Juan Siesquen

One question more ...

This workflow work for FIFO Queue also? Or exist some difference?

Thanks!

Collapse
piczmar_0 profile image
Marcin Piczkowski Author

No, it only works for standard queue, sqs triggers do not work for fifo. You are left with polling the queue yourself.

Collapse
jsiesquen profile image
Juan Siesquen

It means that I can publish (sender.js) messages from a lambda, but I can not read (receive.js) the queue from another lambda? In summary, what would the new flow be?

Thread Thread
piczmar_0 profile image
Marcin Piczkowski Author • Edited

In receive.js you would need to use aws sqs API to fetch messages from your fifo queue. You would have to run receive.js periodically, e.g. schedule it as a cron job (see here), because it won't be triggered by new messages in the queue.

Forem Open with the Forem app