You can use the script below to check whether you are receiving the metrics correctly.
import boto3
from datetime import datetime, timedelta
import time
import pytz
cloudwatch = boto3.client('cloudwatch')
def get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time, end_time):
params = {
'StartTime': start_time,
'EndTime': end_time,
'MetricDataQueries': [
{
'Id': 'm1',
'MetricStat': {
'Metric': {
'Namespace': namespace,
'MetricName': metric_name,
'Dimensions': dimensions
},
'Period': 30,
'Stat': 'Sum'
}
}
]
}
try:
response = cloudwatch.get_metric_data(**params)
return response['MetricDataResults']
except Exception as e:
print(f"Error fetching metrics: {e}")
return []
def main():
namespace = "CWAgent"
metric_name = "<MetricsName>"
dimensions = [{'Name': 'path', 'Value': 'pathName'}, {'Name':'metric_type', 'Value': 'metric_type_value'}]
target_value = 1000
collection_interval_seconds = 30
# utc_now = datetime.now(pytz.UTC) - timedelta(seconds=300)
while True:
end_time = datetime.now(pytz.UTC) - timedelta(seconds=10)
start_time = end_time - timedelta(seconds=collection_interval_seconds)
start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S')
end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%S')
metrics_data = get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time_str, end_time_str)
print("Currrent Time: ", datetime.now(pytz.UTC))
print("Start Time: ", start_time_str)
print("End Time: ", end_time_str)
print(metrics_data)
# if metrics_data:
# aggregated_value = sum(metrics_data[0]['Values']) if metrics_data[0]['Values'] else None
# if aggregated_value and aggregated_value >= target_value:
# print(f"Target metric value met or exceeded: {aggregated_value}")
# else:
# print(f"Current aggregated value: {aggregated_value}, below target.")
# else:
# print("No metrics data received.")
print(f"Sleeping for 10 seconds...")
time.sleep(10)
print()
print()
if __name__ == "__main__":
main()
Requirements.txt:
boto3==1.34.117
botocore==1.34.117
jmespath==1.0.1
python-dateutil==2.9.0.post0
pytz==2024.1
s3transfer==0.10.1
six==1.16.0
urllib3==2.2.1
I have already explained each parameter and its function. For more information, you can refer to this blog:
Scale Deployments Based on HTTP Requests with Keda
CodeWithVed ・ Sep 17
In the above Python script, some parts are commented out. It is entirely up to you how you want to use this script. Since I wanted to check only the metrics and how often it prints the values and at what time, I was simply printing the time, Also you can modify the metrics_collection_time and period based on your needs.
Alternatively, i also mentioned the go code which could you help you if your primary code in Golang:
package main
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
func main() {
namespace := "CWAgent"
metricName := "ingestion/commandservice/totalRequest_count"
dimensions := []*cloudwatch.Dimension{
{Name: aws.String("path"), Value: aws.String("pathName")},
{Name: aws.String("metric_type"), Value: aws.String("metricsTypeValue")},
}
targetValue := float64(100)
collectionIntervalSeconds := int64(30)
for {
endTime := time.Now().UTC()
startTime := endTime.Add(-time.Duration(collectionIntervalSeconds) * time.Second)
// var endTime, startTime time.Time
// layout := "2006-01-02T15:04:05Z"
// endTime, err := time.Parse(layout, "2024-06-05T05:52:33Z")
// startTime, err = time.Parse(layout, "2024-06-05T05:51:33Z")
// if err != nil {
// fmt.Println("Error parsing time:", err)
// return
// }
params := &cloudwatch.GetMetricDataInput{
EndTime: &endTime,
StartTime: &startTime,
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("m1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(namespace),
MetricName: aws.String(metricName),
Dimensions: dimensions,
},
Period: aws.Int64(10),
Stat: aws.String("Sum"),
},
},
},
}
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
})
if err != nil {
fmt.Println("Error creating session:", err)
return
}
svc := cloudwatch.New(sess)
result, err := svc.GetMetricData(params)
if err != nil {
fmt.Println("Error getting metric data:", err)
continue
}
var aggregatedValue float64
if len(result.MetricDataResults) > 0 && result.MetricDataResults[0].Values != nil {
aggregatedValue = sumFloat64Slice(result.MetricDataResults[0].Values)
}
fmt.Printf("Start Time: %s\n", startTime.Format(time.RFC3339))
fmt.Printf("End Time: %s\n", endTime.Format(time.RFC3339))
// for _, value := range result.MetricDataResults[0].Values {
// fmt.Println(*value)
// }
if aggregatedValue >= targetValue {
fmt.Printf("Target metric value met or exceeded: %.2f\n", aggregatedValue)
} else {
fmt.Printf("Current aggregated value: %.2f, below target.\n", aggregatedValue)
}
time.Sleep(10 * time.Second)
}
}
func sumFloat64Slice(slice []*float64) float64 {
sum := float64(0)
for _, v := range slice {
sum += *v
}
return sum
}
Top comments (0)