DEV Community

Cover image for Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI & Programatic Access
karan singh
karan singh

Posted on

Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI & Programatic Access

Introduction

Why do you need this?

  • For local development you want to connect to a remote Kafka Cluster running on OpenShift , that is deployed using Strimzi Operator

Prerequisite

  • OpenShift Container Platform or OKD

  • Strimzi Operator deployed

Deploy Kafka Cluster

  • Create a YAML file with these contents (only for dev/test clusters)
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      namespace: nestjs-testing
    spec:
      entityOperator:
        topicOperator: {}
        userOperator: {}
      kafka:
        config:
          inter.broker.protocol.version: "2.8"
          log.message.format.version: "2.8"
          offsets.topic.replication.factor: 3
          transaction.state.log.min.isr: 2
          transaction.state.log.replication.factor: 3
        listeners:
        - name: plain
          port: 9092
          tls: false
          type: internal
        - name: tls
          port: 9093
          tls: true
          type: internal
        - name: route
          port: 9094
          tls: true
          type: route
        replicas: 3
        storage:
          type: ephemeral
        version: 2.8.0
      zookeeper:
        replicas: 3
        storage:
          type: ephemeral
Enter fullscreen mode Exit fullscreen mode

Preparing to Connect

    oc get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt

    keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt

    # This should create 2 files in PWD

    ls -l *.crt *.jks
Enter fullscreen mode Exit fullscreen mode

Grab Kafka Endpoint

KAFKA_ENDPOINT=$(oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type=="route")].bootstrapServers}{"\n"}')
Enter fullscreen mode Exit fullscreen mode

Connecting from CLI (Kafka Console Producer/Consumer)

  • Get Kafka Console Producer & Consumer script files
    wget [https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz](https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz) ; tar -xvf kafka_2.13-3.0.0.tgz
Enter fullscreen mode Exit fullscreen mode
  • Console Producer
    kafka_2.13-3.0.0/bin/kafka-console-producer.sh --broker-list $KAFKA_ENDPOINT --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=truststore.jks --topic my-topic
Enter fullscreen mode Exit fullscreen mode
  • Console Consumer
    kafka_2.13-3.0.0/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_ENDPOINT --topic my-topic --from-beginning  --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password --consumer-property ssl.truststore.location=truststore.jks
Enter fullscreen mode Exit fullscreen mode

Connecting from Python Client (running locally)

from kafka import KafkaProducer, KafkaConsumer
import json
from bson import json_util

bootstrap_server = 'my-cluster-kafka-route-bootstrap-nestjs-testing.apps.ocp.ceph-s3.com:443'

print("Producing messages to Kafka topic ...")
producer = KafkaProducer(bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL")

for i in range(10):
    message = {'value': i}
    producer.send('my-topic', json.dumps(message, default=json_util.default).encode('utf-8'))

print("Consuming messages from Kafka topic ...")

consumer = KafkaConsumer('my-topic',  group_id='my-group', bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL", consumer_timeout_ms=10000, enable_auto_commit=True)
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: value=%s" % (message.topic, message.partition,message.offset,message.value))
Enter fullscreen mode Exit fullscreen mode

Output of Kafka Python Producer & Consumer example

This is how you can connect to a remote Kafka cluster from your local machine. This is handy when you are developing locally and eventually deploying that to your OpenShift environment.

Discussion (0)