I am going to describe how to create new Kubernetes cluster and install Knative eventing, Kafka flavor, in it. I am actually going to create two Kafka clusters with mirroring enabled, to be able to perform some experiments later on.
I am also going to describe steps one can follow to ensure Knative scales well enough when messages volume increases. And I am going to point to the resources on how to install monitoring for such cluster.
Kubernetes cluster with Knative eventing should fit in Google Cloud trial quotas, but monitoring and scaling workload on top of that might not.
Cluster creation
Create new Kubernetes cluster, one zone, 4-6 nodes, node is Standard compute-optimized (c2-standard-4 at least), 100 Gb disk (best if pd-ssd, but can be pd-standard or pd-balanced). Trial quota is 4 nodes c2-standard-4.
Installing Kafka and Knative
Create namespace knative-eventing
.
Follow Strimzi quickstart to install kafka
in knative-eventing
namespace, but use different Kafka cluster definition, see below. Knative workloads are expecting to be run in knative-eventing
namespace, otherwise issues arise. And it's easier to keep Knative and Kafka in one namespace.
Use kafka-cluster.yaml as kafka cluster resource instead of the one used in Strimzi quickstart (kafka-single-persistent.yaml
). If you're not limited on disk, best to set storage: size: 50Gi
or 100Gb
in kafka-cluster yaml, and at least 25Gb for zookeeper storage. For trial quota, you're limited to 20Gb and 10Gb for zookeeper (if we're doing 2 Kafka clusters, if one - can be more).
Follow knative docs to install Knative eventing. Install all Kafka components too: Kafka sink, Kafka broker, Kafka event source. Use this publication to configure broker config to be Kafka broker class
(replication: 1).
Also make sure to install Kafka source. kafka-source-dispatcher will have 0 pods until some Kafka sources are created.
Autoscaling Knative
For trial quota GCP, you'll likely won't have space for Keda controller or upscaled Knative workloads. Otherwise,
Follow this blog to configure HA for Knative workloads. I would set HA to 6 though, and keep an eye on memory/CPU consumption of the workloads in case you're got significant events traffic going through the system. Otherwise there's going to be slowdown in events delivery.
Install scaling controller for Kafka sources - Keda autoscaler. HPA parameters are controlled by annotations on the Kafka source yaml definition:
metadata:
annotations:
autoscaling.knative.dev/class: keda.autoscaling.knative.dev
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "5"
keda.autoscaling.knative.dev/pollingInterval: "30"
keda.autoscaling.knative.dev/cooldownPeriod: "30"
keda.autoscaling.knative.dev/kafkaLagThreshold: "10"
Kafka of course has it's own parallelism mechanism - creating more brokers, which enables higher partitions amount for a given topic.
Monitoring Knative and Kafka
Follow this publication to setup Prometeus monitoring for Kafka cluster. DataDog has a nice description of what those metrics mean.
Knative has a tutorial on how to setup monitoring. However I ended up creating Service
and ServiceMonitor
by hand for Knative workloads to be able to monitor them.
Here's example Service
and ServiceMon
for kafka-sink-receiver
:
apiVersion: v1
kind: Service
metadata:
name: knative-sink-service
labels:
app: knative-sink-service
spec:
selector:
app: kafka-sink-receiver
ports:
- name: http-metrics
protocol: TCP
port: 9090
target-port: http-metrics
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: knative-sink-service-monitor
labels:
app: knative-sink-service-mon
spec:
selector:
matchLabels:
app: knative-sink-service
endpoints:
- port: http-metrics
Knative exposes a couple of it's own metrics (like processing delays) and also exposes a huge amount of Kafka metrics for it's consumers/producers. I ended up curl-ing Knative Services on the metrics port, and scripting a tool that would help to create primitive Grafana dashboard for the list of metric names and uid of datasource. See readme on how to use the tool. Or can replace datasource uid in the dashboard-*.json
with your datasource uid, and make sure job
selectors in the dashboard JSON match the service name that sends metrics.
Knative dashboards together with Kafka's dashboards it sheds light on almost any aspect of what's going on in the system.
More tuning
Some useful production-grade considerations for Knative could be found here
Knative exposes consumer and producer configs for brokers and other workloads as configmap
. I had more luck with setting
auto.offset.reset=latest
enable.auto.commit=true
commit interval to be about 1.5 seconds, heartbeat interval/2
for Knative sink-receiver config.
More on Kafka consumer and producer tuning
https://strimzi.io/blog/2021/01/07/consumer-tuning/
https://strimzi.io/blog/2020/10/15/producer-tuning/
Make sure it works
You can create a Kafka topic which messages are transferred to another topic using Knative machinery:
input-topic -> knative source -> knative broker -> knative trigger (opt: filter by message headers) -> knative sink -> output-topic
Example definitions to use are below. Apply topics and broker, make sure they've got status Ready (kubectl get kafkatopic -n knative-eventing
, kubectl get broker -n knative-eventing
). Then apply sink and source, also make sure they're ready. Last apply trigger.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: input-topic
namespace: knative-eventing
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: output-topic
namespace: knative-eventing
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-broker
namespace: knative-eventing
annotations:
eventing.knative.dev/broker.class: Kafka
spec: {}
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: input-topic-source
namespace: knative-eventing
# keda autoscaler annotations here if using keda
# see Autoscaling section of blog, above
spec:
consumerGroup: input-topic-source-group
bootstrapServers:
- my-cluster-kafka-bootstrap.knative-eventing:9092
topics:
- input-topic
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: my-broker
---
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: output-topic-sink
namespace: knative-eventing
spec:
topic: output-topic
bootstrapServers:
- my-cluster-kafka-bootstrap.knative-eventing:9092
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: output-trigger
namespace: knative-eventing
spec:
broker: my-broker
# can define a filter for messages based on header, input Kafka headers get `kafkaheader` prefix. So if message was sent on `input-topic` with header `Ce-my-header: my-value`, it's filter here will be `kafkaheadercemyheader: my-value`
# filter:
# attributes:
# kafkaheadercemyheader: my-value
subscriber:
ref:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
name: output-topic-sink
Here's primitive Python web app that simply logs message upon arrival. Can use echo app as destination sink instead of second topic. Deployment for web app echo should be in namespace knative-eventing
, and expose ClusterIP
type Service
that maps port 80 map to 8083. If you're not familiar with how to create deployment and service for it, use k8s docs or use Google Console "new deployment button" (gotta upload image to dockerhub or another artifact registry first though).
Let's send some messages.
Launch listener for output-topic:
kubectl -n knative-eventing run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.37.0-kafka-3.5.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic output-topic --from-beginning --property print.headers=true
In other tab, launch client for input-topic:
kubectl -n knative-eventing run kafka-producer -ti --image=quay.io/strimzi/kafka:0.37.0-kafka-3.5.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic --property parse.headers=true --property headers.delimiter=\t --property headers.separator=, --property headers.key.separator=:
And post following payload to input-topic:
Ce-my-header:my-value\t{"msg":"content"}
The same message should arrive to output-topic, with original headers having kafkaheader prefix:
ce_specversion:1.0,ce_id:...,ce_source:...,content-type:application/json; charset=utf-8,kafkaheadercemyheader:my-value {"msg":"content"}
Top comments (0)