DEV Community

Muhammad Ahmad Khan
Muhammad Ahmad Khan

Posted on • Updated on

ECS auto-scaling based on Amazon MQ for RabbitMQ’s queue depth.

There is a very common use case in the industry about scaling the application based on queue depth. In this regard, AWS itself has published the documentation which can be found here.

In this blog post we will discuss about ECS auto-scaling based on queue depth through the use of AWS application auto-scaling.

There are three solutions you can use with AWS application auto-scaling.

  • Target Tracking (Policy)
  • Step Scaling (Policy)
  • Scheduled scaling (Action)

Here we will discuss about first two only.

Target Tracking Scaling Policies:

Target tracking scaling policy increases or decreases the number of tasks that your service runs based on a target value for a specific CloudWatch metric (predefined or customized).
Target tracking scaling policy assumes that it should scale out your ECS tasks when the specified metric is above the target value and scale in when it is below the target value.
In short, in target tracking scaling policies, Amazon ECS creates and manages the CloudWatch alarms that trigger the scaling policy and calculates the scaling adjustment based on the CloudWatch metric and the target value that you specify. With target tracking, AWS controls the scaling adjustments automatically based on your target specified.

Step Scaling Policies:

Step scaling policy increases or decreases the number of tasks that your service runs by considering the set of scaling adjustments, known as step adjustments, which means you can set multiple actions to vary the scaling depending on the size of the alarm breach.
In short, step scaling policies increase or decrease current capacity based on a set of scaling adjustments (known as step adjustments) that you specify. With step scaling, you control the scaling adjustments.

When working with both policies, it must be noted that you can easily scale your application based on queue depth using step scaling but not with target scaling because target scaling will only allow these three predefined metrics in ECS.

  • ALBRequestCountPerTarget (load balancer metric)
  • ECSServiceAverageCPUUtilization
  • ECSServiceAverageMemoryUtilization

Further, since in target scaling AWS itself create CloudWatch Alarm and manage its scaling therefore you do not have the autonomy to set your desired metric for scaling without creating a custom metric. Thus you need to make a custom metric as described in AWS documentation.

In order to monitor the number of unconsumed message with SQS we can use metric named as ApproximateNumberOfMessagesVisible and if we want to monitor messages from Amazon MQ for RabbitMQ we can use metric named as MessageReadyCount.
One more problem with using a CloudWatch SQS metric like ApproximateNumberOfMessagesVisible for target tracking is that the number of messages in the queue might not change proportionally to the number of tasks within a service that processes messages from the queue. That's because the number of messages in your SQS queue does not solely define the number of tasks needed. The number of tasks in your ECS service can be driven by multiple factors, including how long it takes to process a message and the acceptable amount of latency (queue delay).
AWS proposes a solution to this problem by introducing a backlog per instance metric with the target value being the acceptable backlog per instance.

In AWS documentation, it has stated that:
Backlog per instance: To calculate your backlog per instance, start with the ApproximateNumberOfMessages queue attribute to determine the length of the SQS queue (number of messages available for retrieval from the queue). Divide that number by the fleet's running capacity, which for an Auto Scaling group is the number of instances in the InService state, to get the backlog per instance.
Acceptable backlog per instance: To calculate your target value, first determine what your application can accept in terms of latency. Then, take the acceptable latency value and divide it by the average time that an EC2 instance takes to process a message.

In other words, backlog per instance is the value obtained by dividing the number of messages yet to be consumed within the queue with the total number of tasks running at the moment.
Similarly, acceptable backlog per instance is the target value that we use in our scaling policy, and it is the acceptable latency value divided by the average time that an ECS task takes to process a message.
This acceptable backlog per instance (target value) varies from application to application and use case to use case.

Example from AWS documentation:
As an example, let's say that you currently have an Auto Scaling group with 10 instances and the number of visible messages in the queue (ApproximateNumberOfMessages) is 1500. If the average processing time is 0.1 seconds for each message and the longest acceptable latency is 10 seconds, then the acceptable backlog per instance is 10 / 0.1, which equals 100 messages. This means that 100 is the target value for your target tracking policy. When the backlog per instance reaches the target value, a scale-out event will happen. Because the backlog per instance is already 150 messages (1500 messages / 10 instances), your group scales out, and it scales out by five instances to maintain proportion to the target value.

Following the AWS documentation, I have created a lambda that creates a custom metric in a custom namespace named as “Queue Based Scaling Metrics”. But in this scenario, our application is using AmazonMQ for RabbitMQ rather than SQS.

See the simple flow of architecture in the below diagram.

Architectural flow diagram

In our lambda, we will calculate all the values for our custom metric including backlog per instance as well as acceptable backlog per instance as mentioned in AWS documentation.
For this purpose, we need to set two parameter values (acceptable_latency, time_process_per_message) as mentioned above and these parameters will vary from application to application. In my scenario, I have set these values as follows:

  • acceptable latency = 250
  • time process per message = 0.8

Here is the lambda code for all the calculations:

              src/app.py

import boto3
import dateutil
from datetime import date, timedelta, datetime

def lambda_handler(event, context):
    cw_client = boto3.client('cloudwatch')
    ecs_client = boto3.client('ecs')
    cluster_name = event["cluster_name"]
    service_name = event['service_name']
    mq_cluster_name = event["mq_cluster_name"]
    mq_queue_name = event["mq_queue_name"]
    acceptable_latency = (event["acceptable_latency"])
    time_process_per_message = (event["time_process_per_message"])
    queue_attribute_calculation(cw_client, ecs_client, cluster_name, service_name, mq_cluster_name, mq_queue_name, acceptable_latency,
                                time_process_per_message)


def queue_attribute_calculation(cw_client, ecs_client, cluster_name, service_name, mq_cluster_name, mq_queue_name, acceptable_latency,
                                time_process_per_message):
    response = ecs_client.describe_services(cluster=cluster_name, services=[service_name])
    running_task_count = response['services'][0]['runningCount']
    print("Running Task: " + str(running_task_count))

    yesterday=date.today() - timedelta(days=2)
    tomorrow=date.today() + timedelta(days=1)
    response = cw_client.get_metric_data(
        MetricDataQueries=[
            {
                'Id': 'mq1',
                'MetricStat': {
                    'Metric': {
                        'Namespace': 'AWS/AmazonMQ',
                        'MetricName': 'MessageReadyCount',
                        'Dimensions': [
                            {
                                'Name': 'Broker',
                                'Value': mq_cluster_name
                            },
                            {
                                'Name': "VirtualHost",
                                'Value': "/"
                            },
                            {
                                'Name': "Queue",
                                'Value': mq_queue_name
                            }
                        ]
                    },
                    'Period': 1,
                    'Stat': 'Sum',
                    'Unit': 'Count'
                },
            },
        ],
        StartTime=datetime(yesterday.year, yesterday.month, yesterday.day),
        EndTime=datetime(tomorrow.year, tomorrow.month, tomorrow.day)
    )

    queue_size =  response['MetricDataResults'][0]['Values'][0]

    print("Running Task: " + str(running_task_count))
    print("Queue Message Count (per second): " + str(queue_size))

    """
    Backlog Per Capacity Unit = Queue Size (MessageReadyCount) / Running Capacity of ECS Task Count
    """
    try:
        backlog_per_capacity_unit = int(int(queue_size) / running_task_count)
    except ZeroDivisionError as err:
        print('Handling run-time error:', err)
        backlog_per_capacity_unit = 0
    print("Backlog Per Capacity Unit: " + str(backlog_per_capacity_unit))

    """
    Acceptable backlog per capacity unit = Acceptable Message Processing Latency (seconds) / Average time to Process a Message each Task (seconds)
    """
    acceptablebacklogpercapacityunit = float((int(acceptable_latency) / float(time_process_per_message)))
    print("Acceptable backlog per capacity unit: " + str(acceptablebacklogpercapacityunit))

    putMetricToCW(cw_client, 'AmazonMQ', mq_cluster_name, 'Queue', mq_queue_name, 'MessageReadyCount', int(queue_size),
                  'Queue Based Scaling Metrics')
    putMetricToCW(cw_client, 'AmazonMQ', mq_cluster_name, 'Queue', mq_queue_name, 'BackLogPerCapacityUnit', backlog_per_capacity_unit,
                  'Queue Based Scaling Metrics')
    putMetricToCW(cw_client, 'AmazonMQ', mq_cluster_name, 'Queue', mq_queue_name, 'AcceptableBackLogPerCapacityUnit', acceptablebacklogpercapacityunit,
                  'Queue Based Scaling Metrics')


def putMetricToCW(cw, dimension_name, dimension_value, dimension_sub_attribute_name, dimension_sub_attribute_value, metric_name, metric_value, namespace):
    cw.put_metric_data(
        Namespace=namespace,
        MetricData=[
            {
                'MetricName': metric_name,
                'Dimensions': [
                    {
                        'Name': dimension_name,
                        'Value': dimension_value
                    },
                    {
                        'Name': dimension_sub_attribute_name,
                        'Value': dimension_sub_attribute_value
                    }
                ],
                'Timestamp': datetime.now(dateutil.tz.tzlocal()),
                'Value': metric_value
            }
        ]
    )

Enter fullscreen mode Exit fullscreen mode

Note that in order to get AmazonMQ for RabbitMQ metric (MessageReadyCount) I have used CloudWatch client SDK call because I was unable to find any API call to get this metric directly from AmazonMQ for RabbitMQ broker client SDK.

I have deployed this lambda using SAM template and here is the solution for it.
              template.yml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  Stack which creates a lambda function that is invoked by a CloudWatch Event Rule every minute to update custom metric datapoints through aws sdk calls.

Parameters:
  Env:
    Type: String
    Description: Values can be dev, qa, devops, test, staging and prod respectively
  ClusterNameParameter:
    Type: String
  ServiceNameParameter:
    Type: String
  MqClusterNameParameter:
    Type: String
  MqQueueNameParameter:
    Type: String
  AcceptableLatencyParameter:
    Type: Number
  TimeProcessPerMessageParameter:
    Type: Number
  SecurityGroupIdParameter:
    Type: String
  SubnetIdAParameter:
    Type: String
  SubnetIdBParameter:
    Type: String
  SubnetIdCParameter:
    Type: String

Resources:
  CustomMetricRole:
    Type: AWS::IAM::Role
    Properties:
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        - 'arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          -
            Effect: Allow
            Principal:
              Service:
                - 'lambda.amazonaws.com'
                - 'events.amazonaws.com'
            Action:
              - 'sts:AssumeRole'
      Policies:
        - PolicyName: 'LambdaBasicExecutionPolicy'
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Sid: LambdaExecutionPolicy
                Effect: Allow
                Action:
                - ec2:Describe*
                - ec2:CreateNetworkInterface
                - ec2:DeleteNetworkInterface
                - ec2:DescribeNetworkInterfaces
                - cloudwatch:GetMetricData
                - cloudwatch:PutMetricData
                - "ecs:*"
                Resource: '*'
              - Sid: ExtraPolicy
                Effect: Allow
                Action:
                - ssm:GetParameters
                - ssm:GetParameter
                Resource: '*'
        - PolicyName: 'LogPolicy'
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              -
                Effect: Allow
                Action:
                  - 'logs:CreateLogGroup'
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: '*'

  CustomMetricFunction:
    Type: 'AWS::Serverless::Function'
    Properties:
      FunctionName: !Sub create-custom-metric-for-scaling-${Env}-cluster # FunctionName uses Env variable to identify its belonging to the Env specific cluster
      Description: A function that is invoked by a CloudWatch Event every minute to update custom metric datapoints through aws sdk calls.
      CodeUri: src/
      Handler: app.lambda_handler
      Runtime: python3.9
      Timeout: 900
      Role: !GetAtt CustomMetricRole.Arn
      Events:
        CloudWatchEventRule:
          Type: Schedule
          Properties:
            Input: !Sub '{"cluster_name": "${ClusterNameParameter}","service_name": "${ServiceNameParameter}","mq_cluster_name": "${MqClusterNameParameter}","mq_queue_name": "${MqQueueNameParameter}","acceptable_latency": "${AcceptableLatencyParameter}","time_process_per_message": "${TimeProcessPerMessageParameter}"}'
            # Read more about schedule expressions here: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html
            Schedule: cron(* * * * ? *) # Invoke function every minute
      VpcConfig:
        SecurityGroupIds:
          - !Ref SecurityGroupIdParameter
        SubnetIds:
          - !Ref SubnetIdAParameter
          - !Ref SubnetIdBParameter
          - !Ref SubnetIdCParameter

Enter fullscreen mode Exit fullscreen mode

Here is the SAM template deployment command:
First, create the SAM template’s parameter file.

              .sam-params-dev

Env=dev
ClusterNameParameter=dev-worker-cluster
ServiceNameParameter=dev-worker-service
MqClusterNameParameter=scan-dev
MqQueueNameParameter=scans_dev
AcceptableLatencyParameter=250
TimeProcessPerMessageParameter=0.8
SecurityGroupIdParameter=sg-123456789
SubnetIdAParameter=subnet-123456789
SubnetIdBParameter=subnet-123456789
SubnetIdCParameter=subnet-123456789
Enter fullscreen mode Exit fullscreen mode

sam deploy --stack-name create-custom-metric --template-file template.yml --s3-bucket test-deployment-bucket --capabilities CAPABILITY_IAM --region us-east-1 --parameter-overrides $(cat .sam-params-dev)

Finally, we will use AWS CLI to put the auto-scaling policy with a target value on ECS.
First, create the JSON policy.

              config-dev.json

{
    "TargetValue":312.5,
    "ScaleOutCooldown":1,
    "ScaleInCooldown":1,
    "CustomizedMetricSpecification":{
       "MetricName":"BackLogPerCapacityUnit",
       "Namespace":"Queue Based Scaling Metrics",
       "Dimensions":[
          {
             "Name":"AmazonMQ",
             "Value":"scan-dev"
          },
          {
            "Name":"Queue",
            "Value":"scans_dev"
         }
       ],
       "Statistic":"Average"
    }
 }
Enter fullscreen mode Exit fullscreen mode

In above policy, the target value is Acceptable backlog per instance. This means it is:
acceptable latency/time process per message = 250/0.8 = 312.5 BackLogPerCapacityUnit is the custom metric that we have created through our lambda in Queue Based Scaling Metrics namespace.
scan-dev is the RabbitMQ cluster name and scans_dev is the queue name.

aws application-autoscaling register-scalable-target \
--service-namespace ecs \
--scalable-dimension ecs:service:DesiredCount \
--resource-id service/dev-worker-cluster/dev-worker-service \
--min-capacity 1 \
--max-capacity 10 \
--profile dev

aws application-autoscaling put-scaling-policy \
--policy-name ecs-scaling-based-on-message-consumption-rate \
--service-namespace ecs \
--resource-id service/dev-worker-cluster/dev-worker-service \
--scalable-dimension ecs:service:DesiredCount \
--policy-type TargetTrackingScaling \
--target-tracking-scaling-policy-configuration file://config-dev.json \
--profile dev

We can automate the creation of the application auto-scaling policy with Terraform and CloudFormation if required.
Note that we are just applying the autoscaling policy and AWS will create CloudWatch Alarm itself.

See the git repository here.

Latest comments (0)