DEV Community

AWS Lambda SQS events with Serverless Framework

Marcin Piczkowski on July 01, 2018

This post was initially published at Cloudly.Tech On 28 JUN 2018 Amazon announced adding support for SQS events in Lambda. This greatly simpl...
Collapse
 
jswhisperer profile image
Greg, The JavaScript Whisperer

Hey I can't find much about reservedConcurrency when you say limit to 1 thread, do lambdas share threads with more concurrency? Also if you limit the concurrency are the extra executions queued up for a bit or just fail? like if I call 100 lambda executions and there is a reservedConcurrency of 1, will it run 1 at a time until all 100 are up?
Cheers
Greg

Collapse
 
piczmar_0 profile image
Marcin Piczkowski

Greg, please have a look at this doc section "Throttling Behavior"

In short: when you exceed the reservedConcurrency limit, let's say call 100 lambdas while you set the limit to 1, you will experience throttling and the whether lambda will retry your call or not depends on the source of the event, e.g. if you call Lambda from API Gateway you will get the error from your API straight without any retry (you need to handle retries yourself in API called if you wish to)

Collapse
 
kbariotis profile image
Kostas Bariotis

After we experience throtlling, what happens to the messages that cannot be processed? are they going back to the queue? Does SQS considers them failed?

Thread Thread
 
piczmar_0 profile image
Marcin Piczkowski • Edited

If your Lambda function is invoked asynchronously, e.g. from other function, AWS Lambda automatically retries the throttled event for up to six hours, with delays between retries.

In case of lambda invoked from sqs (poll-based) AWS Lambda will continue to retry until MessageRetentionPeriod set for the queue expires or if you have redrive policy set on the queue then until MaxReceiveCount after which the message goes to DLQ.

docs.aws.amazon.com/lambda/latest/...

Thread Thread
 
kbariotis profile image
Kostas Bariotis

Yep got you, problem is that you cannot have a normal sequential processing by a certain number, say by 1, and fill up the queue. In a similar case, I ended up setting the MaxReceiveCount to 1000 in order to achieve that. Which worked alright but brought other issues, e.g. the message must not fail cause it will just keep retrying for a long time.

Thread Thread
 
piczmar_0 profile image
Marcin Piczkowski

I'm not sure I get your problem. Would be nice if you could illustrate it with example project. By sequential you mean processing messages in order I suppose but for this you would need fifo queues or design the app differently, e.g. say you store the current processing state in db. When message2 arrives before message1 you check in db that message1 was not yet processed and you put message1 in another queue and have another lambda which will forward the message back to the original queue (then the receive count is 0 again), when message1 arrives next you process it and update db state so when next time you receive message2 you will know that you can process it already. You could also use the DLQ for that and the other lambda could pass messages from DLQ back to the original queue. Just an idea, don't know your particular case but maybe it works for you.

Thread Thread
 
kbariotis profile image
Kostas Bariotis

Ah no I wasn't talking about order but rather the regulation flow. So I want the messages to wait in the queue until the worker can process them but with SQS/Lambda the messages won't wait since they are experiencing throttling and will fail and will either be removed or go to the DLQ.

But I liked you technique on preserving order. I haven't tried it, maybe I will in the future.

Thread Thread
 
piczmar_0 profile image
Marcin Piczkowski

Hmm.. I see. So maybe you could try the same technique here. When throttled message goes to DLQ you have a lambda function running in some scheduled interval which you set by experimenting with throttle rate, and this lambda will forward the message from DLQ back to original queue from which it could possibly be processed again and hopefully not throttled again.

Thread Thread
 
kbariotis profile image
Kostas Bariotis

I just created this repo and remembered of our discussion. This is how I managed to create a sequential processing pipeline with AWS SQS and Lambda on a Serverless project.

Let me know what you think! Thank you.

github.com/kbariotis/sqs-lambda-co...

Thread Thread
 
piczmar_0 profile image
Marcin Piczkowski

Nice :) I like it.

I tried to find out what happens to the CloudWatch event which is throttled, if it is retried or dropped but I could not see in docs, but I suppose it is more a problem of AWS what to do with high number of cumulated CloudWatch events which are continuously retried, rather than your problem, since setting of concurrency limit on Lambda to 1 provides that no more than 1 instance of this function will execute at certain time.

Nice wrap up. Thanks.

Collapse
 
jsiesquen profile image
Juan Siesquen • Edited

Hi Marcin,

Have an error when I try make a POST request to URL del API Gateway:

This is the response:
502 Bad Gateway
{
"message": "Internal server error"
}

What happend?

I not found nothing in CloudWatch Logs about of error into the REST resource.

Regards,

Collapse
 
jsiesquen profile image
Juan Siesquen

Make a test on API Gateway POST verb and your log show me is:
Execution failed due to configuration error: Malformed Lambda proxy response
Method completed with status: 502

But have well formed my response in my sender....

var response = {
"statusCode": responseCode,
"headers": {
"Content-Type": "application/json"
},
"body": JSON.stringify(responseBody),
"isBase64Encoded": false
};

Read this...
aws.amazon.com/es/premiumsupport/k...

But nothing! wuu

Collapse
 
piczmar_0 profile image
Marcin Piczkowski

Can you put your whole handler code somewhere, e.g
on gist to check how the function looks like?

Thread Thread
 
jsiesquen profile image
Juan Siesquen

I just re-created the sender.js and it goes well. I think he wrote something wrong. The important thing is that I could check how the data exchange between lambda and SQS works. Thank you!

Only a tiny addition:
if (err) {
console.log('error:', "failed to send message" + err);
responseCode = 500;
responseBody = err;
...

I did not understand the concurrency part yet. If my Lambda has more than 2000 records to send to SQS, I will send them one by one to the queue. I think the lambda will take a long time, can I send batch queues? Each record is a message to the queue.

Maybe it illuminates us with an additional post about this?

Thank you so much Marcin!

Thread Thread
 
piczmar_0 profile image
Marcin Piczkowski

you are allowed to send at most 10 messages in single batch docs.aws.amazon.com/AWSSimpleQueue...

I would delegate this task to multiple lambdas if you need to send batches concurrently to speedup.

Collapse
 
emramos profile image
Emanuel MΓΌller Ramos

Thanks for the article, very useful :)
I have a question regarding the IAM permissions. In the serverless.yml three permissions were granted:
- "sqs:SendMessage"
- "sqs:GetQueueUrl"
- "sqs:ListQueues"

Why sqs:GetQueueUrl and sqs:ListQueues are needed for this example? Are they a pre-requisite for being able to send the messages in sender.js?
Regarding sqs:SendMessage, what will be the scope of the permission? Will it only allow the lambdas declared within the serverless.yml file to be able to send messages, or will it allow any service within the AWS account and/or public network to access it?
Thx!
Manu

Collapse
 
piczmar_0 profile image
Marcin Piczkowski

Hi, the permissions are needed for sender to send message to queue.
The scope of permission is only for lambdas within this CloudFormation stack.

Collapse
 
majoraze profile image
Pedro Rodrigues

Do I have to delete the message after receiving and processing?

Like:

var params = {
QueueUrl: 'STRING_VALUE', /* required /
ReceiptHandle: 'STRING_VALUE' /
required */
};

sqs.deleteMessage(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});

Collapse
 
piczmar_0 profile image
Marcin Piczkowski

Hi Pedro, I was confused at the beginning too, because this was the way to handle messages before SQS events were supported in Lambda when you had to poll the queue yourself.

It is not needed anymore and explained in AWS documentation (docs.aws.amazon.com/lambda/latest/...)

Lambda takes care of:
 - Automatically retrieving messages and directing them to the target Lambda function.
 - Deleting them once your Lambda function successfully completes.

What is important is the "successfully completes", because if the Lambda fails the message will remain in the queue and will be processed again, so you need to handle exceptions carefully.

You can e.g. surround lambda in try-catch clause and do not let Lambda fail because of wrong message or configure dead-letter queues where failed messages will be redirected automatically after Lambda fails. Then you can have e.g. separate lambda for dead-letter queues to handle such messages.

I'm planning to write more about Lambda error handling soon.

Collapse
 
mihailoff profile image
George Mihailov

Connect AWS Api Gateway to SQS pretty much by using CloudFormation

cloudhut.io/connect-aws-api-gatewa...

Collapse
 
piczmar_0 profile image
Marcin Piczkowski

Cool, looks like Serverless Framework does not support yet triggering SQS from Api Gateway, so CloudFormation is the only way, but it is work in progress: forum.serverless.com/t/api-gateway...

Collapse
 
jsiesquen profile image
Juan Siesquen

One question more ...

This workflow work for FIFO Queue also? Or exist some difference?

Thanks!

Collapse
 
piczmar_0 profile image
Marcin Piczkowski

No, it only works for standard queue, sqs triggers do not work for fifo. You are left with polling the queue yourself.

Collapse
 
jsiesquen profile image
Juan Siesquen

It means that I can publish (sender.js) messages from a lambda, but I can not read (receive.js) the queue from another lambda? In summary, what would the new flow be?

Thread Thread
 
piczmar_0 profile image
Marcin Piczkowski • Edited

In receive.js you would need to use aws sqs API to fetch messages from your fifo queue. You would have to run receive.js periodically, e.g. schedule it as a cron job (see here), because it won't be triggered by new messages in the queue.