DEV Community

Oli Makhasoeva for bytewax

Posted on • Originally published at bytewax.io on

How We Detect Anomalies In Our AWS Infrastructure (And Have Peaceful Nights)

Post image

Introduction

Everyone who's using a cloud provider wants to monitor the system to detect anomalies in the usage. We run some internal data services, our website/blog and a few demo clusters on AWS and we wanted a low-maintenance way to monitor the infrastructure for issues, so we took the opportunity to dogfood Bytewax, of course :).

In this blog post, we will walk you through the process of building a cloud-based anomaly detection system using Bytewax, Redpanda, and Amazon Web Services (AWS). Our goal is to create a dataflow that detects anomalies in EC2 instance CPU utilization. To achieve this, we will collect usage data from AWS CloudWatch using Logstash and store it using Redpanda, a Kafka-compatible streaming data platform. Finally, we will use Bytewax, a Python stream processor, to build our anomaly detection system.

This is exactly the same infrastructure we use internally at Bytewax and, in fact, we haven't touched it for months!

Setting Up the Required Infrastructure on AWS

Before we begin, ensure that you have the following prerequisites set up:

  • AWS CLI configured with admin access
  • Helm
  • Docker
  • A Kubernetes cluster running in AWS and kubectl configured to access it

Configuring Kubernetes and Redpanda

In this section, we will configure Kubernetes and Redpanda using the provided code snippets. Make sure you have a running Kubernetes cluster in AWS and kubectl configured to access it.

Step 1: Set up a namespace

Create a new namespace for Redpanda and set it as the active context:

kubectl create ns redpanda-bytewax


kubectl config set-context --current --namespace=redpanda-bytewax

Enter fullscreen mode Exit fullscreen mode

Step 2: Install Cert-Manager and Redpanda Operator

The Redpanda operator requires cert-manager to create certificates for TLS communication. To install cert-manager with Helm:

helm repo add jetstack https://charts.jetstack.io && \
helm repo update && \
helm install \
  cert-manager jetstack/cert-manager \
  --namespace cert-manager \
  --create-namespace \
  --version v1.4.4 \
  --set installCRDs=true

Enter fullscreen mode Exit fullscreen mode

Fetch the latest Redpanda Operator version, add the Redpanda Helm repo, and install the Redpanda Operator:

export VERSION=$(curl -s https://api.github.com/repos/redpanda-data/redpanda/releases/latest | jq -r .tag_name)


helm repo add redpanda https://charts.vectorized.io/ && helm repo update


kubectl apply -k https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=$VERSION


helm install redpanda-operator redpanda/redpanda-operator --namespace redpanda-system --create-namespace --version $VERSION

Enter fullscreen mode Exit fullscreen mode

Step 3: Create Redpanda cluster

Save the following YAML configuration in a file named 3_node_cluster.yaml:

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: three-node-cluster
spec:
  image: "vectorized/redpanda"
  version: "latest"
  replicas: 3
  resources:
    requests:
      cpu: 1
      memory: 1.2Gi
    limits:
      cpu: 1
      memory: 1.2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    schemaRegistry:
      port: 8081
    adminApi:
    - port: 9644
    developerMode: true

Enter fullscreen mode Exit fullscreen mode

Apply the Redpanda cluster configuration:

kubectl apply -f ./3_node_cluster.yaml

Enter fullscreen mode Exit fullscreen mode

Check the status of Redpanda pods:

kubectl get po -lapp.kubernetes.io/component=redpanda

Enter fullscreen mode Exit fullscreen mode

Export the broker addresses:

export BROKERS=`kubectl get clusters three-node-cluster -o=jsonpath='{.status.nodes.internal}' | jq -r 'join(",")'`

Enter fullscreen mode Exit fullscreen mode

Step 4: Set up topics

Run an rpk container to create and manage topics:

kubectl run rpk-shell --rm -i --tty --image vectorized/redpanda --command /bin/bash

Enter fullscreen mode Exit fullscreen mode

In the rpk terminal, export the broker addresses:

export BROKERS=three-node-cluster-0.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-1.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-2.three-node-cluster.redpanda-bytewax.svc.cluster.local.

Enter fullscreen mode Exit fullscreen mode

View the cluster information:

rpk --brokers $BROKERS cluster info

Enter fullscreen mode Exit fullscreen mode

Create two topics with 5 partitions each:

rpk --brokers $BROKERS topic create ec2_metrics -p 5


rpk --brokers $BROKERS topic create ec2_metrics_anomalies -p 5

Enter fullscreen mode Exit fullscreen mode

List the topics:

rpk --brokers $BROKERS topic list

Enter fullscreen mode Exit fullscreen mode

Consume messages from the ec2_metrics topic:

rpk --brokers $BROKERS topic consume ec2_metrics -o start

Enter fullscreen mode Exit fullscreen mode

Exporting CloudWatch EC2 Metrics to our Redpanda Cluster with Logstash

Logstash is an open-source data processing pipeline that can ingest data from multiple sources, transform it, and send it to various destinations, such as Redpanda. In this case, we'll use Logstash to collect EC2 metrics from CloudWatch and send them to our Redpanda cluster for further processing.

Logstash Permissions

First, we need to create an AWS policy and user with the required permissions for Logstash to access CloudWatch and EC2. Save the following JSON configuration in a file named cloudwatch-logstash-policy.json:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1444715676000",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:GetMetricStatistics",
                "cloudwatch:ListMetrics"
            ],
            "Resource": "*"
        },
        {
            "Sid": "Stmt1444716576170",
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeInstances"
            ],
            "Resource": "*"
        }
    ]
}

Enter fullscreen mode Exit fullscreen mode

Now we can create the policy and user, and attach the policy to the user:

aws iam create-policy --policy-name CloudwatchLogstash --policy-document file://cloudwatch-logstash-policy.json
aws iam create-user --user-name logstash-user


export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)


aws iam attach-user-policy --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/CloudwatchLogstash --user-name logstash-user

Enter fullscreen mode Exit fullscreen mode

To provide access, we can create Kubernetes secrets for the AWS access key and secret access key:

kubectl create secret generic aws-secret-access-key --from-literal=value=$(aws iam create-access-key --user-name logstash-user | jq -r .AccessKey.SecretAccessKey)


kubectl create secret generic aws-access-key-id --from-literal=value=$(aws iam list-access-keys --user-name logstash-user --query "AccessKeyMetadata[0].AccessKeyId" --output text)

Enter fullscreen mode Exit fullscreen mode

Now we can create an Amazon Elastic Container Registry (ECR) repository to store the custom Logstash image:

aws ecr create-repository --repository-name redpanda-bytewax


export REPOSITORY_URI=$(aws ecr describe-repositories --repository-names redpanda-bytewax --profile sso-admin --output text --query "repositories[0].repositoryUri")

Enter fullscreen mode Exit fullscreen mode

Next, we create a Logstash Image with CloudWatch Input Plugin installed by creating a Dockerfile named logstash-Dockerfile that has the plugin installed as a RUN step in the Dockerfile like shown in the dockerfile code snippet:

FROM docker.elastic.co/logstash/logstash:7.17.3
RUN bin/logstash-plugin install logstash-input-cloudwatch

Enter fullscreen mode Exit fullscreen mode

Finally, we build and push the Logstash image to the ECR repository:

docker build -f logstash-Dockerfile -t $REPOSITORY_URI:\logstash-cloudwatch .


export AWS_REGION=us-west-2


aws ecr get-login-password --region $AWS_REGION --profile sso-admin | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com


docker push $REPOSITORY_URI:\logstash-cloudwatch

Enter fullscreen mode Exit fullscreen mode

Deploy Logstash on Kubernetes

Now that we have our custom Logstash image, we will deploy it on Kubernetes using the Helm chart provided by Elastic. First, we need to gather some information and create a logstash-values.yaml file with the necessary configuration.

Run the following commands to obtain the required information:

echo $REPOSITORY_URI


echo $AWS_REGION


echo $BROKERS | sed -e 's/local\./local\:9092/g'

Enter fullscreen mode Exit fullscreen mode

Create a logstash-values.yaml file and replace the placeholders (shown with <>) with the information obtained above:

image: "<YOUR REPOSITORY URI>"
imageTag: "logstash-cloudwatch"
imagePullPolicy: "Always"

persistence:
  enabled: true

logstashConfig:
  logstash.yml: |
    http.host: 0.0.0.0
    xpack.monitoring.enabled: false

logstashPipeline:
  uptime.conf: |
    input {
      cloudwatch {
        namespace => "AWS/EC2"
        metrics => ["CPUUtilization"]
        region => "<YOUR AWS REGION>"
        interval => 300
        period => 300
      }       
    }
    filter {
      mutate {
        add_field => {
          "[index]" => "0"
          "[value]" => "%{maximum}"
          "[instance]" => "%{InstanceId}"                      
        }
      }
    }
    output {
        kafka {
          bootstrap_servers => "<YOUR REDPANDA BROKERS>"
          topic_id => 'EC2Metrics'
          codec => json
        }
    }

extraEnvs:
  - name: 'AWS_ACCESS_KEY_ID'
    valueFrom:
      secretKeyRef:
        name: aws-access-key-id
        key: value
  - name: 'AWS_SECRET_ACCESS_KEY'
    valueFrom:
      secretKeyRef:
        name: aws-secret-access-key
        key: value

Enter fullscreen mode Exit fullscreen mode

With the logstash-values.yaml file ready, install the Logstash Helm chart:

helm upgrade --install logstash elastic/logstash -f logstash-values.yaml

Enter fullscreen mode Exit fullscreen mode

Now to verify that Logstash is exporting the EC2 metrics to the Redpanda cluster, open a terminal with rpk and consume the ec2_metrics topic:

rpk --brokers $BROKERS topic consume ec2_metrics -o start

Enter fullscreen mode Exit fullscreen mode

Use CTRL-C to quit the rpk terminal when you're done.

Building a Dataflow to Detect Anomalies with Bytewax

With our infrastructure in place, it's time to build a dataflow to detect anomalies. We will use Bytewax and Waxctl to define and deploy a dataflow that processes the EC2 instance CPU utilization data stored in the Redpanda cluster.

Anomaly Detection with Half Space Trees

Half Space Trees (HST) is an unsupervised machine learning algorithm used for detecting anomalies in streaming data. The algorithm is designed to efficiently handle high-dimensional and high-velocity data streams. HST builds a set of binary trees to partition the feature space into half spaces, where each tree captures a different view of the data. By observing the frequency of points falling into each half space, the algorithm can identify regions that are less dense than others, suggesting that data points within those regions are potential anomalies.

In our case, we will use HST to detect anomalous CPU usage in EC2 metrics. We'll leverage the Python library River, which provides an implementation of the HST algorithm, and Bytewax, a platform for creating data processing pipelines.

Building the Dataflow for Anomaly Detection

To create our dataflow, we'll first import the necessary libraries and set up Kafka connections. The following code snippet demonstrates how to create a dataflow with River and Bytewax to consume EC2 metrics from Kafka and detect anomalous CPU usage using HST:

import json
import os
import datetime as dt
from pathlib import Path

from bytewax.connectors.kafka import KafkaInput, KafkaOutput
from bytewax.dataflow import Dataflow
from bytewax.recovery import SqliteRecoveryConfig

from river import anomaly

kafka_servers = os.getenv("BYTEWAX_KAFKA_SERVER", "localhost:9092")
kafka_topic = os.getenv("BYTEWAX_KAFKA_TOPIC", "ec2_metrics")
kafka_output_topic = os.getenv("BYTEWAX_KAFKA_OUTPUT_TOPIC", "ec2_metrics_anomalies")

# Define the dataflow object and kafka input.
flow = Dataflow()
flow.input("inp", KafkaInput(kafka_servers.split(","), [kafka_topic]))

# convert to percentages and group by instance id
def group_instance_and_normalize(key__data):
  _, data = key__data
  data = json.loads(data)
  data["value"] = float(data["value"]) / 100
  return data["instance"], data

flow.map(group_instance_and_normalize)
# ("c6585a", {"index": "1", "value": "0.11", "instance": "c6585a"})

# Stateful operator for anomaly detection
class AnomalyDetector(anomaly.HalfSpaceTrees):

Enter fullscreen mode Exit fullscreen mode

Our anomaly detector inherits from the HalfSpaceTrees object from the river package and has the following inputs

n_trees – defaults to 10 height – defaults to 8 window_size – defaults to 250 limits (Dict[Hashable, Tuple[float, float]]) – defaults to None seed (int) – defaults to None


  def __init__ (self, *args, **kwargs):
      super(). __init__ (*args, n_trees=5, height=3, window_size=5, seed=42, **kwargs)

  def update(self, data):
      self.learn_one({"value": data["value"]})
      data["score"] = self.score_one({"value": data["value"]})
      if data["score"] > 0.7:
          data["anom"] = 1
      else:
          data["anom"] = 0
      return self, (
          data["index"],
          data["timestamp"],
          data["value"],
          data["score"],
          data["anom"],
      )

flow.stateful_map("detector", lambda: AnomalyDetector(), AnomalyDetector.update)
# (("c6585a", {"index": "1", "value":0.08, "instance": "fe7f93", "score":0.02}))

# filter out non-anomalous values
flow.filter(lambda x: bool(x[1][4]))

flow.map(lambda x: (x[0], json.dumps(x[1][4])))
flow.output("output", KafkaOutput([kafka_servers], kafka_output_topic))

Enter fullscreen mode Exit fullscreen mode

In this dataflow, we first read data from Kafka and deserialize the JSON message. We then normalize the CPU usage values and group them by the instance ID. Next, we apply the AnomalyDetector class inside a stateful operator, which calculates the anomaly score for each data point using HST. We set a threshold for the anomaly score (0.7 in this example) and mark data points as anomalous if their scores exceed the threshold. Finally, we filter out non-anomalous values and output the anomalous data points to a separate Kafka topic.

Using this dataflow, we can continuously monitor EC2 metrics and detect anomalous CPU usage, helping us identify potential issues in our infrastructure.

Creating a Dataflow docker image

dataflow-Dockerfile

FROM bytewax/bytewax:0.16.0-python3.9
RUN /venv/bin/pip install river==0.10.1 pandas confluent-kafka


docker build -f dataflow-Dockerfile -t $REPOSITORY_URI:\dataflow . 


docker push $REPOSITORY_URI:\dataflow

Enter fullscreen mode Exit fullscreen mode

Deploying the Dataflow

To deploy the dataflow, we'll use the Bytewax command-line tool, waxctl. There are two options for deploying the dataflow, depending on how you have set up your Kafka server environment variable. When we deploy our dataflow we will set the processes (denoted by p) to 5 to match the number of partitions we set when we intially created our redpanda topic.

Option 1: Generate waxctl command

Use the following command to generate the waxctl command with the appropriate environment variables:

echo"
waxctl df deploy ./dataflow.py \\
  --name ec2-cpu-ad \\
  -p 5 \\
  -i $REPOSITORY_URI \\
  -t dataflow \\
  -e '\"BYTEWAX_KAFKA_SERVER=$BROKERS\"' \\
  -e BYTEWAX_KAFKA_TOPIC_GROUP_ID=dataflow_group \\
  --debug
"

Enter fullscreen mode Exit fullscreen mode

This will output the waxctl command with the correct Kafka server values. Copy the output and run it to deploy the dataflow.

Option 2: Hardcoded BYTEWAX_KAFKA_SERVER value

If you prefer to hardcode the Kafka server values, use the following command to deploy the dataflow:

waxctl df deploy ./dataflow.py \
  --name ec2-cpu-ad \
  -p 5 \
  -i $REPOSITORY_URL \
  -t dataflow \
  -e '"BYTEWAX_KAFKA_SERVER=three-node-cluster-0.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-1.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-2.three-node-cluster.redpanda-bytewax.svc.cluster.local."' \
  -e BYTEWAX_KAFKA_TOPIC_GROUP_ID=dataflow_group \
  --debug

Enter fullscreen mode Exit fullscreen mode

Now that we have deployed our dataflow, after enough time, you'll be able to consume from the anomalies topic to see any anomalies.

rpk --brokers $BROKERS topic consume ec2_metrics_anomalies -o start

Enter fullscreen mode Exit fullscreen mode

As a next step, you could deploy a dataflow to consume from the anomalies and alert you in Slack! Or add rerun like we demonstrated in the previous blog post to visualize the anomalies.

Conclusion

In this blog post, we have demonstrated how to set up a system for monitoring EC2 metrics and detecting anomalous CPU usage. By leveraging tools like Logstash, Redpanda, River, and Bytewax, we've created a robust and scalable pipeline for processing and analyzing streaming data.

This system provides a range of benefits, including:

  1. Efficiently processing high-dimensional and high-velocity data streams
  2. Using the Half Space Trees unsupervised machine learning algorithm for detecting anomalies in streaming data
  3. Continuously monitoring EC2 metrics and identifying potential issues in the infrastructure

With this setup, you can effectively monitor your EC2 instances and ensure that your infrastructure is running smoothly, helping you proactively address any issues that may arise.

That's it! You now have a working cloud-based anomaly detection system using Bytewax, Redpanda, and AWS. Feel free to adapt this setup to your specific use case and explore the various features and capabilities offered by these tools.

Top comments (0)