DEV Community

Cover image for Creating a Log Based Metric Exporter
Mat
Mat

Posted on • Originally published at cronusmonitoring.com

Creating a Log Based Metric Exporter

This blog post will show how I created a simple log-based-exporter that measures the occurance of logs in kubernetes containers

Why

Major cloud providers like GCP & AWS offer log based metrics but at scale they can become extremely cost prohibitive. My aim was to create a cheaper solution that converts the logs into prometheus metrics out of the box, with the added benefit that they are measured in real time.

How it works

  1. You specify the logs you want to measure, the containers that they occur in and the namespaces where the containers run. The exporter will automatically discover all matching pods as they appear
  2. The exporter measures the metrics in real time, every time prometheus pings the /metrics endpoint, the gauge's are reset

Generating some logs

This quick little program will spit out copious amounts of logs. I've containerized it and deployed to a local cluster

package main

import (
    "fmt"
    "time"
)

func main() {
    i := 0
    for {
        l := fmt.Sprintf("log entry %v", i)
        fmt.Println(l)

        if i > 10000 {
            i = 0
        }
        i += 1

        time.Sleep(time.Millisecond * 50)
    }
}
Enter fullscreen mode Exit fullscreen mode

Fetching these logs using the kube api

func StreamLogs(ctx context.Context, client *kubernetes.Clientset, ns string, pod string, callback LogCallback) error {
    opts := &corev1.PodLogOptions{
        Follow: true,
    }

    req := client.CoreV1().Pods(ns).GetLogs(pod, opts)
    stream, err := req.Stream(ctx)
    if err != nil {
        return err
    }
    defer stream.Close()

    buffer := make([]byte, 4096) // Adjust buffer size as needed
    for {
        n, readErr := stream.Read(buffer)
        if readErr != nil {
            // Check if it's the end of the stream
            if readErr == context.Canceled {
                log.Println("Stream canceled")
                break
            }
            return readErr
        }

        // Call the callback with the data chunk
        callback(ns, pod, buffer[:n])
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This function is using a callback function to parse the data outside of the function. Heres an example implementation of the callback function.

We're doing a couple of things.

  1. initializing the StreamLogs function
  2. Define a callback that will measure logs as they appear
  3. Incrementing metrics based on the outcome of the measurement
func monitorPodForRule(ctx context.Context, rule rules.Rule, ns string, pod string) {
    select {
    case <-ctx.Done():
        log.Infof("stopped monitoring %s/%s for rule %s", ns, pod, rule.Name)
        return
    default:
        callback := func(ns string, pod string, data []byte) {
            log.Debugf("namespace %s pod %s bytes processed %v", ns, pod, len(data))
            match := measureLogAgainstCondition(data, rule)
            if match {
                m.IncrementMetric(rule, ns, pod)
            } else if os.Getenv("EXPORT_ZERO") == "true" {
                m.SetMetric(rule, ns, pod, 0)
            }
        }

        client, err := kube.GenClient()
        if err != nil {
            log.Error(err)
            return
        }

        err = kube.StreamLogs(ctx, client, ns, pod, callback)
        if err != nil {
            log.Error(err)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The measurement is pretty simple, we just iterate through each condition and check for sub string matches

func measureLogAgainstCondition(data []byte, rule rules.Rule) bool {
    for _, cond := range rule.Condition {
        if !strings.Contains(string(data), cond) {
            return false
        }
    }
    return true
}

Enter fullscreen mode Exit fullscreen mode

How to dynamically monitor pods as they appear

Kubernetes has the following hierachy Namespace --> Pod --> Container. Since pod names are random, we need to perform a lookup and find pods that contain the containers we wish to monitor.

This function will do the needed.

func findPodsForRule(ctx context.Context, rule rules.Rule) (map[string][]string, error) {
    log.Infof("finding pods for rule %s", rule.Name)

    // scan namespaces for containers
    client, err := kube.GenClient()
    if err != nil {
        return nil, err
    }

    namespaces, err := kube.ListNamespaces(ctx, client)
    if err != nil {
        return nil, err
    }

    validNamespaces := make([]string, 0)
    for _, ns := range rule.Namespace {
        for _, actualNS := range namespaces {
            if actualNS == ns {
                validNamespaces = append(validNamespaces, ns)
                break
            }
        }
    }

    if len(validNamespaces) == 0 {
        return nil, fmt.Errorf("rule %s no valid namespaces were found", rule.Name)
    }

    log.Infof("rule %s. %v of %v namespaces found", rule.Name, len(validNamespaces), len(rule.Namespace))

    matchingPods := make([]string, 0)
    result := make(map[string][]string)
    for _, ns := range validNamespaces {
        info, err := kube.ListContainers(ctx, client, ns)
        if err != nil {
            log.Errorf("could not list containers for namespace %s", ns)
            continue
        }

        pods, ok := info[ns]
        if !ok {
            log.Errorf("could not list containers for namespace %s", ns)
            continue
        }

        result[ns] = make([]string, 0)

        for podName, containers := range pods {
            for _, container := range containers {
                for _, ruleContainer := range rule.Container {
                    if ruleContainer == container {
                        matchingPods = append(matchingPods, podName)
                        result[ns] = append(result[ns], podName)
                        break
                    }
                }
            }
        }

    }

    if len(matchingPods) == 0 {
        return nil, fmt.Errorf("no matching pods found for rule %s", rule.Name)
    }

    log.Infof("for rule %s found the following namespaces %v", rule.Name, validNamespaces)
    log.Infof("for rule %s found the following pods %v", rule.Name, matchingPods)

    return result, nil
}
Enter fullscreen mode Exit fullscreen mode

The last thing we need for the exporter is a way to bring up and down monitoring for a pod, we'll use context to control this. We'll run a scan every 15 seconds for new pods that match, any pods that no longer exist we'll cancel the monitoring for them, and bring up monitoring for the new pods.

func monitorRule(ctx context.Context, rule rules.Rule) {
    // refresh every 15 seconds
    ticker := time.NewTicker(15 * time.Second)
    defer ticker.Stop()

    for ; true; <-ticker.C {
        if ctx.Err() != nil {
            return
        }

        pods, err := findPodsForRule(ctx, rule)
        if err != nil {
            log.Errorf("error finding pods for rule: %v", err)
            continue
        }

        updateMonitoring(ctx, rule, pods)
    }
}
Enter fullscreen mode Exit fullscreen mode
func updateMonitoring(ctx context.Context, rule rules.Rule, currentPods map[string][]string) {
    monitorState.Lock()
    defer monitorState.Unlock()

    // Create a set of current pods for quick lookup
    currentSet := make(map[string]bool)
    for ns, pods := range currentPods {
        for _, pod := range pods {
            key := podKey(ns, pod)
            currentSet[key] = true

            if _, monitored := monitorState.pods[key]; !monitored {
                // Start monitoring new pod
                podCtx, cancel := context.WithCancel(ctx)
                monitorState.pods[key] = monitoredPod{cancel: cancel}
                metrics.IncActiveMetrics()
                go monitorPodForRule(podCtx, rule, ns, pod)
            }
        }
    }

    // Check for pods that are no longer current and cancel their monitoring
    for key, mp := range monitorState.pods {
        if !currentSet[key] {
            mp.cancel()
            metrics.DecActiveMetrics()
            delete(monitorState.pods, key)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuration

If you remember the log generating app I made at the top, here I'll create a rule to measure the logs generated from it. In this rule we'll match everytime log entry & 4 are detected in the incoming log stream

[
  {
    "name": "log entry 400",
    "metric": "log_entry_400",
    "namespace": ["default"],
    "container": ["dummy"],
    "condition": ["4", "log entry"]
  }
]
Enter fullscreen mode Exit fullscreen mode

Deploying to Kubernetes

Here is a full working example of a k8s deployment manifest. Once it's deployed we can now see that its picking up the log based metrics.

log_based_metric{metric="log_entry_400",name="log entry 4",namespace="default",pod="dummy-59b956b77c-2h78c"} 0

log_based_metric{metric="log_entry_400",name="log entry 4",namespace="default",pod="dummy-59b956b77c-99rvv"} 73
Enter fullscreen mode Exit fullscreen mode

Thanks for reading!

Top comments (0)