DEV Community

Jane Radetska
Jane Radetska

Posted on • Edited on

Setup Knative Eventing with Kafka from scratch, scale based on events volume, and monitor

I am going to describe how to create new Kubernetes cluster and install Knative eventing, Kafka flavor, in it. I am actually going to create two Kafka clusters with mirroring enabled, to be able to perform some experiments later on.

I am also going to describe steps one can follow to ensure Knative scales well enough when messages volume increases. And I am going to point to the resources on how to install monitoring for such cluster.

Kubernetes cluster with Knative eventing should fit in Google Cloud trial quotas, but monitoring and scaling workload on top of that might not.

Cluster creation

Create new Kubernetes cluster, one zone, 4-6 nodes, node is Standard compute-optimized (c2-standard-4 at least), 100 Gb disk (best if pd-ssd, but can be pd-standard or pd-balanced). Trial quota is 4 nodes c2-standard-4.

Installing Kafka and Knative

Create namespace knative-eventing.

Follow Strimzi quickstart to install kafka in knative-eventing namespace, but use different Kafka cluster definition, see below. Knative workloads are expecting to be run in knative-eventing namespace, otherwise issues arise. And it's easier to keep Knative and Kafka in one namespace.
Use kafka-cluster.yaml as kafka cluster resource instead of the one used in Strimzi quickstart (kafka-single-persistent.yaml). If you're not limited on disk, best to set storage: size: 50Gi or 100Gb in kafka-cluster yaml, and at least 25Gb for zookeeper storage. For trial quota, you're limited to 20Gb and 10Gb for zookeeper (if we're doing 2 Kafka clusters, if one - can be more).

Follow knative docs to install Knative eventing. Install all Kafka components too: Kafka sink, Kafka broker, Kafka event source. Use this publication to configure broker config to be Kafka broker class (replication: 1).

Also make sure to install Kafka source. kafka-source-dispatcher will have 0 pods until some Kafka sources are created.

Autoscaling Knative

For trial quota GCP, you'll likely won't have space for Keda controller or upscaled Knative workloads. Otherwise,

Follow this blog to configure HA for Knative workloads. I would set HA to 6 though, and keep an eye on memory/CPU consumption of the workloads in case you're got significant events traffic going through the system. Otherwise there's going to be slowdown in events delivery.

Install scaling controller for Kafka sources - Keda autoscaler. HPA parameters are controlled by annotations on the Kafka source yaml definition:

metadata:
  annotations:
    autoscaling.knative.dev/class: keda.autoscaling.knative.dev
    autoscaling.knative.dev/minScale: "0"
    autoscaling.knative.dev/maxScale: "5"
    keda.autoscaling.knative.dev/pollingInterval: "30"
    keda.autoscaling.knative.dev/cooldownPeriod: "30"
    keda.autoscaling.knative.dev/kafkaLagThreshold: "10"
Enter fullscreen mode Exit fullscreen mode

Kafka of course has it's own parallelism mechanism - creating more brokers, which enables higher partitions amount for a given topic.

Monitoring Knative and Kafka

Follow this publication to setup Prometeus monitoring for Kafka cluster. DataDog has a nice description of what those metrics mean.

Knative has a tutorial on how to setup monitoring. However I ended up creating Service and ServiceMonitor by hand for Knative workloads to be able to monitor them.

Here's example Service and ServiceMon for kafka-sink-receiver:

apiVersion: v1
kind: Service
metadata:
  name: knative-sink-service
  labels:
    app: knative-sink-service
spec:
  selector:
    app: kafka-sink-receiver
  ports:
    - name: http-metrics
      protocol: TCP
      port: 9090
      target-port: http-metrics
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: knative-sink-service-monitor
  labels:
    app: knative-sink-service-mon
spec:
  selector:
    matchLabels:
      app: knative-sink-service
  endpoints:
  - port: http-metrics
Enter fullscreen mode Exit fullscreen mode

Knative exposes a couple of it's own metrics (like processing delays) and also exposes a huge amount of Kafka metrics for it's consumers/producers. I ended up curl-ing Knative Services on the metrics port, and scripting a tool that would help to create primitive Grafana dashboard for the list of metric names and uid of datasource. See readme on how to use the tool. Or can replace datasource uid in the dashboard-*.json with your datasource uid, and make sure job selectors in the dashboard JSON match the service name that sends metrics.

Knative dashboards together with Kafka's dashboards it sheds light on almost any aspect of what's going on in the system.

More tuning

Some useful production-grade considerations for Knative could be found here

Knative exposes consumer and producer configs for brokers and other workloads as configmap. I had more luck with setting

auto.offset.reset=latest
enable.auto.commit=true
commit interval to be about 1.5 seconds, heartbeat interval/2
Enter fullscreen mode Exit fullscreen mode

for Knative sink-receiver config.

More on Kafka consumer and producer tuning

https://strimzi.io/blog/2021/01/07/consumer-tuning/
https://strimzi.io/blog/2020/10/15/producer-tuning/

Make sure it works

You can create a Kafka topic which messages are transferred to another topic using Knative machinery:

input-topic -> knative source -> knative broker -> knative trigger (opt: filter by message headers) -> knative sink -> output-topic
Enter fullscreen mode Exit fullscreen mode

Example definitions to use are below. Apply topics and broker, make sure they've got status Ready (kubectl get kafkatopic -n knative-eventing, kubectl get broker -n knative-eventing). Then apply sink and source, also make sure they're ready. Last apply trigger.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: input-topic
  namespace: knative-eventing
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: output-topic
  namespace: knative-eventing
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: my-broker
  namespace: knative-eventing
  annotations:
    eventing.knative.dev/broker.class: Kafka
spec: {}
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: input-topic-source
  namespace: knative-eventing
# keda autoscaler annotations here if using keda
# see Autoscaling section of blog, above
spec:
  consumerGroup: input-topic-source-group
  bootstrapServers:
  - my-cluster-kafka-bootstrap.knative-eventing:9092
  topics:
  - input-topic
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: my-broker
---
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: output-topic-sink
  namespace: knative-eventing
spec:
  topic: output-topic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.knative-eventing:9092
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: output-trigger
  namespace: knative-eventing
spec:
  broker: my-broker
  # can define a filter for messages based on header, input Kafka headers get `kafkaheader` prefix. So if message was sent on `input-topic` with header `Ce-my-header: my-value`, it's filter here will be `kafkaheadercemyheader: my-value`
  # filter:
  #  attributes:
  #    kafkaheadercemyheader: my-value
  subscriber:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: KafkaSink
      name: output-topic-sink
Enter fullscreen mode Exit fullscreen mode

Here's primitive Python web app that simply logs message upon arrival. Can use echo app as destination sink instead of second topic. Deployment for web app echo should be in namespace knative-eventing, and expose ClusterIP type Service that maps port 80 map to 8083. If you're not familiar with how to create deployment and service for it, use k8s docs or use Google Console "new deployment button" (gotta upload image to dockerhub or another artifact registry first though).

Let's send some messages.

Launch listener for output-topic:

kubectl -n knative-eventing run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.37.0-kafka-3.5.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic output-topic --from-beginning --property print.headers=true
Enter fullscreen mode Exit fullscreen mode

In other tab, launch client for input-topic:

kubectl -n knative-eventing run kafka-producer -ti --image=quay.io/strimzi/kafka:0.37.0-kafka-3.5.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic --property parse.headers=true  --property headers.delimiter=\t --property headers.separator=, --property headers.key.separator=:
Enter fullscreen mode Exit fullscreen mode

And post following payload to input-topic:

Ce-my-header:my-value\t{"msg":"content"}
Enter fullscreen mode Exit fullscreen mode

The same message should arrive to output-topic, with original headers having kafkaheader prefix:

ce_specversion:1.0,ce_id:...,ce_source:...,content-type:application/json; charset=utf-8,kafkaheadercemyheader:my-value {"msg":"content"}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)