DEV Community

Cover image for AWS Lambda Rust EMF metrics helper

AWS Lambda Rust EMF metrics helper

Photo by Isaac Smith on Unsplash

Introduction

In the AWS Lambda ecosystem, there is an awesome set of tools created to cover common scenarios when creating serverless functions. They are gathered together as Lambda Powertools and they are available in a few languages.

At the moment there is no version of Powertools for Rust (they will be created eventually). I saw this as a great opportunity to learn by building small helpers for AWS Lambda in Rust.

I am not attempting to replicate the Powertools suite. Instead, I’ll focus on implementing specific functionalities.

Metrics

I decided to start by creating utilities for CloudWatch custom metrics. When running AWS Lambda we have two main options to put our custom metrics to CloudWatch - using SDK, or printing the log shaped accordingly to Embedded Metrics Format (EMF). The latter is much cheaper and faster, but it requires some boilerplate.
EMF lets you publish up to 100 metric points at once and define up to 30 dimensions.

Goal

I create the library, which would let us

  • create metrics for defined namespace and dimensions
  • add more dimensions up to maximum limit defined by AWS
  • handle gracefully going beyond the limit of 100 metric points
  • publish metrics automatically once Lambda function finishes

As you see, not all functionalities provided by original Powertools are covered, yet the library should be already usable (and even useful, hopefully)

Implementation

The complete code for this implementation is available in the GitHub repository

I need two main pieces. First are types (structs, enums) that map to the EMF. I can use some serde magic to seamlessly serialize them to the JSON.
The second piece would be a struct with some methods to hold a mutable state of current metrics and allow actions on it.

Types

EMF format is defined here. After translating it to Rust I got something like this.

// lib.rs
//...
/// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure_metricdefinition
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetricDefinition {
    name: String,
    unit: MetricUnit,
    storage_resolution: u64,
}

/// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure_metricdirective
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetricDirective {
    namespace: String,
    dimensions: Vec<Vec<DimensionName>>,
    metrics: Vec<MetricDefinition>,
}

/// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure_metadata
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetadataObject {
    timestamp: i64,
    cloud_watch_metrics: Vec<MetricDirective>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct CloudWatchMetricsLog {
    #[serde(rename = "_aws")]
    aws: MetadataObject,
    #[serde(flatten)]
    dimensions: Dimensions,
    #[serde(flatten)]
    metrics_values: MetricValues,
}

// ...
Enter fullscreen mode Exit fullscreen mode
  • I use new type pattern to keep my types expressive
// lib.rs
// ...
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct Dimensions(HashMap<String, String>);

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetricValues(HashMap<String, f64>);

#[derive(Debug, Serialize, Deserialize)]
pub struct DimensionName(String);

#[derive(Debug, Serialize, Deserialize)]
pub struct Namespace(String);
// ...
Enter fullscreen mode Exit fullscreen mode
  • I decided to keep metrics values as a float. For some metrics, I would use integers, but I didn't want to add more complexity at this point

  • Metrics units are defined with enums

    // lib. rs
    // ...
    #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
    pub enum MetricUnit {
    Seconds,
    Microseconds,
    Milliseconds,
    Bytes,
    Kilobytes,
    Megabytes,  
    Gigabytes,
    Terabytes,
    Count,
    BytesPerSecond,
    KilobytesPerSecond,
    MegabytesPerSecond,
    GigabytesPerSecond,
    TerabytesPerSecond,
    BitsPerSecond,
    KilobitsPerSecond,
    MegabitsPerSecond,
    GigabitsPerSecond,
    TerabitsPerSecond,
    CountPerSecond,
    }
    // ...
    

The only logic I added here is the into function, which handles conversion to string, needed for printing the log. As we will see in the moment, this implementation requires some fixes, but I intentionally leave it as is for now, to showcase clippy capabilities.

// lib.rs
// ...
impl CloudWatchMetricsLog {
    pub fn into(self) -> String {
        serde_json::to_string(&self).unwrap()
    }
}
// ...
Enter fullscreen mode Exit fullscreen mode

Domain

Let's first define the domain types

// lib.rs
// ...
#[derive(Debug, Serialize, Deserialize)]
pub struct Metric {
    name: String,
    unit: MetricUnit,
    value: f64,
}
// ...
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct Metrics {
    namespace: Namespace,
    dimensions: Dimensions,
    metrics: Vec<Metric>,
}
// ...
Enter fullscreen mode Exit fullscreen mode

Now we need some functionalities.

Add dimension

This might fail, so no matter if we like it or not, we need to return Result and leave the responsibility of handling it to the caller. I will work on error types separately, so for now the Err is just a String

// lib.rs
// ...
impl Metrics {
// ...
pub fn try_add_dimension(&mut self, key: &str, value: &str) -> Result<(), String> {
        if self.dimensions.0.len() >= MAX_DIMENSIONS {
            Err("Too many dimensions".into())
        } else {
            self.dimensions.0.insert(key.to_string(), value.to_string());
            Ok(())
        }
    }
// ...
Enter fullscreen mode Exit fullscreen mode

Add metric

At first glance, this operation should return Result too, because, we might reach the limit of 100 metrics. Additionally, we can't post two data points for the same metric in one log.
Both cases are easily solvable - it is enough to flash current metrics and start collecting metrics from scratch.
The trade-off is that we expect metrics not to impact the performance of our lambda function. On the other hand, printing the log with a limited size sounds like a better strategy than returning an Err and forcing the caller to deal with it.

// lib.rs
// ...
impl Metrics {
// ...
pub fn add_metric(&mut self, name: &str, unit: MetricUnit, value: f64) {
        if self.metrics.len() >= MAX_METRICS
            || self.metrics.iter().any(|metric| metric.name == name)
        {
            self.flush_metrics();
        }
        self.metrics.push(Metric {
            name: name.to_string(),
            unit,
            value,
        });
    }
// ...
Enter fullscreen mode Exit fullscreen mode

Flush metrics

Flushing metrics means simply printing them to the console and removing current entries from our object

// lib.rs
// ...
impl Metrics {
// ...
pub fn flush_metrics(&mut self) {
        let payload = self.format_metrics().into();
        println!("{payload}");
        self.metrics = Vec::new();
    }
// ...
Enter fullscreen mode Exit fullscreen mode

Format metrics

The main part of the logic, which is at the same time just transforming domain types to the AWS EMF types

// lib.rs
// ...
pub fn format_metrics(&self) -> CloudWatchMetricsLog {
        let metrics_definitions = self
            .metrics
            .iter()
            .map(|entry| entry.into())
            .collect::<Vec<MetricDefinition>>();

        let metrics_entries = vec![MetricDirective {
            namespace: self.namespace.0.to_string(),
            dimensions: vec![self
                .dimensions
                .0
                .keys()
                .map(|key| DimensionName(key.to_string()))
                .collect()],
            metrics: metrics_definitions,
        }];

        let cloudwatch_metrics = MetadataObject {
            timestamp: Utc::now().timestamp_millis(),
            cloud_watch_metrics: metrics_entries,
        };

        let metrics_values = self
            .metrics
            .iter()
            .map(|metric| (metric.name.to_string(), metric.value))
            .collect::<HashMap<_, _>>();

        let cloudwatch_metrics_log = CloudWatchMetricsLog {
            aws: cloudwatch_metrics,
            dimensions: self.dimensions.clone(),
            metrics_values: MetricValues(metrics_values),
        };

        cloudwatch_metrics_log
    }
// ...
Enter fullscreen mode Exit fullscreen mode

Flush metrics automatically at the end of the function

One more thing to implement. the user can manually flush metrics, but it is not very convenient. Let's flush metrics once the function ends, which means that our metrics object goes out of scope

// lib.rs
// ...
impl Drop for Metrics {
    fn drop(&mut self) {
        println!("Dropping metrics, publishing metrics");
        self.flush_metrics();
    }
}
Enter fullscreen mode Exit fullscreen mode

Unit tests

Business logic isn't very complex, so I can create a few test cases with basic assertions.

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn should_create_metrics() {
        let mut metrics = Metrics::new("test_namespace", "service", "dummy_service");
        metrics.add_metric("test_metric_count", MetricUnit::Count, 1.0);
        metrics.add_metric("test_metric_seconds", MetricUnit::Seconds, 22.0);

        let log = metrics.format_metrics();

        assert_eq!(log.aws.cloud_watch_metrics[0].namespace, "test_namespace");
        assert_eq!(log.aws.cloud_watch_metrics[0].metrics[0].name, "test_metric_count");
        assert_eq!(log.aws.cloud_watch_metrics[0].metrics[0].unit, MetricUnit::Count);
        assert_eq!(
            log.aws.cloud_watch_metrics[0].metrics[0].storage_resolution,
            60
            );
        assert_eq!(log.metrics_values.0.get("test_metric_count"), Some(&1.0));
        assert_eq!(
            log.aws.cloud_watch_metrics[0].metrics[1].name,
            "test_metric_seconds"
        );
        assert_eq!(log.aws.cloud_watch_metrics[0].metrics[1].unit, MetricUnit::Seconds);
        assert_eq!(
            log.aws.cloud_watch_metrics[0].metrics[1].storage_resolution,
            60
        );
        assert_eq!(log.dimensions.0.len(), 1);
    }

    #[test]
    fn should_handle_duplicated_metric() {
        let mut metrics = Metrics::new("test", "service", "dummy_service");
        metrics.add_metric("test", MetricUnit::Count, 2.0);
        metrics.add_metric("test", MetricUnit::Count, 1.0);

        assert_eq!(metrics.metrics.len(), 1);
    }

    #[test]
    fn should_not_fail_over_100_metrics() {
        let mut metrics = Metrics::new("test", "service", "dummy_service");
        for i in 0..100 {
            metrics.add_metric(&format!("metric{i}"), MetricUnit::Count, i as f64);
        }

        assert_eq!(metrics.metrics.len(), 100);
        metrics.add_metric("over_100", MetricUnit::Count, 11.0);
        assert_eq!(metrics.metrics.len(), 1);
    }

    #[test]
    fn should_fail_if_over_30_dimensions() {
        let mut metrics = Metrics::new("test", "service", "dummy_service");
        for i in 0..29 {
            metrics
                .try_add_dimension(&format!("key{i}"), &format!("value{i}"))
                .unwrap();
        }

        match metrics.try_add_dimension("key31", "value31") {
            Ok(_) => assert!(false, "expected error"),
            Err(_) => assert!(true),
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Example

In the examples directory, I created the basic lambda function with AWS SAM.

// ...
async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
    // Extract some useful info from the request
    let command = event.payload.command;

    let mut metrics = Metrics::new("custom_lambdas", "service", "dummy_service");

    metrics.try_add_dimension("application", "customer_service");

    metrics.add_metric("test_count", MetricUnit::Count, 10.4);

    metrics.add_metric("test_seconds", MetricUnit::Seconds, 15.0);

    metrics.add_metric("test_count", MetricUnit::Count, 10.6);

    // Prepare the response
    let resp = Response {
        req_id: event.context.request_id,
        msg: format!("Command {}.", command),
    };

    // Return `Response` (it will be serialized to JSON automatically by the runtime)
    Ok(resp)
}
// ...
Enter fullscreen mode Exit fullscreen mode

After sam build && sam deploy we can test the function from the console

Image description

As expected, there are two logs to the metrics. The first was emitted when we added the test_count metric for the second time, and the last one was emitted when the function finished.

Image description

Finally, I can see metrics added to the CloudWatch

Image description

Cleaning up

The library works, which is great, but there are probably things to improve. Let's run clippy - the great Rust linter.

cargo clippy -- -D clippy::pedantic
Enter fullscreen mode Exit fullscreen mode

Image description

The pedantic linter is pretty opinionated, but this is totally ok for me. All right, let's improve the code.

Panics

Image description

The new function won't panic, since there are no dimensions added yet, so adding the first one is safe. This is a good place to use an allow statement to make it clear, that this behavior is intentionally handled this way.

impl Metrics {
    #[allow(clippy::missing_panics_doc)]
    pub fn new(namespace: &str, dimension_key: &str, dimension_value: &str) -> Self {
        let mut metrics = Self {
            dimensions: Dimensions(HashMap::new()),
            namespace: Namespace(namespace.to_string()),
            metrics: Vec::new(),
        };
        // UNWRAP: for new metrics there is no risk of reaching max number of dimensions
        metrics
            .try_add_dimension(dimension_key, dimension_value)
            .unwrap();
        metrics
    }
Enter fullscreen mode Exit fullscreen mode

The second panic is more interesting. The into function for CloudWatchMetricsLog might fail if Serialize decides to panic, or there are HashMaps with non-string keys.
Instead of Into I need to implement TryInto. Not to say that I shouldn't create a bare into function, but implement TryInto trait.

impl TryInto<String> for CloudWatchMetricsLog {
    type Error = String;

    fn try_into(self) -> Result<String, Self::Error> {
        serde_json::to_string(&self).map_err(|err| err.to_string())
    }
}
Enter fullscreen mode Exit fullscreen mode

For now, I leave Error as a String, because I am not going to return it to the caller. Instead, I will only print an error

/// Flushes the metrics to stdout in a single payload
    /// If an error occurs during serialization, it will be printed to stderr
    pub fn flush_metrics(&mut self) {
        let serialized_metrics: Result<String, _> = self.format_metrics().try_into();

        match serialized_metrics {
            Ok(payload) => println!("{payload}"),
            Err(err) => eprintln!("Error when serializing metrics: {err}"),
        }
        self.metrics = Vec::new();
    }
Enter fullscreen mode Exit fullscreen mode

must_use and other

Clippy (pedantic) checks public functions as candidates for #[must_use] attribute. The new function for Metrics feels like good match.

impl Metrics {
    #[allow(clippy::missing_panics_doc)]
    #[must_use]
    pub fn new(namespace: &str, dimension_key: &str, dimension_value: &str) -> Self {
// ...
Enter fullscreen mode Exit fullscreen mode

I have also followed other clippy suggestions, including docs formatting. Speaking of what ....

Documentation

Rust has a great story for creating documentation. Doc comments let use markdown, and eventually, are transformed into the web page.
After adding comments to the crate in general, and to the public structs and functions we have pretty nice docs out-of-the-box.

I run

cargo doc --open --lib 
Enter fullscreen mode Exit fullscreen mode

The browser automatically opens a page with my docs:

Image description

Publish crate

Ok, now I am ready to publish the first version of the created lib. It is exciting!

I change the name of the lig to lambda-helpers-metrics, and set the version to the 0.1.0-alpha.

After creating an account on crates.io and email verification I am set up to publish crates.

Image description

πŸŽ‰ πŸŽ‰ πŸŽ‰

Now I can run cargo add lambda_helpers_metrics in any lambda function project and add some metrics. It feels nice :)

Summary

I built a simple library for creating metrics using AWS EMF spec. Thanks to that we can avoid expensive calls with putMetrics and simply log data in the specific shape to the console.

Clippy helped me to catch a bunch of possible mistakes.

Finally, I published the crate to the crates.io so it can be added to the project with cargo add

Next steps

There are some things I would like to work on next

  • defining Errors with the proper types
  • tracking cold starts
  • add CI/CD

Please add a comment if you see more useful features to be added.

Thanks!

Top comments (0)