DEV Community

Amzar
Amzar

Posted on

Basic Kafka Stream in Python

Introduction

Kafka is an open-source distributed streaming platform developed by the Apache Software Foundation. It is designed to handle large volumes of real-time data streams and is used for building real-time data pipelines and streaming applications.

For instance, if you would like to have a real-time event log fetch from the source (Twitter, etc) and insert it into the target (CSV, SQLite, etc), you can leverage the Kafka library in Python. So, you need to know what are producer and consumer in Kafka.

Producer and Consumer

Producer & Consumer in Kafka

  1. Producer is a program or application that sends data or messages to a Kafka topic.
  2. Consumer is a program or application that reads data or messages from a Kafka topic.

How to setup Kafka in Python?

Now, let's start with requirements to configure Kafka in Python.

Requirements

Kafka server

You can start with local installation or AWS and GCP for advanced method). In this tutorial, I am launching Kafka server using Docker Compose.

Assume Docker is installed and running or run docker --version to confirm

Create a docker compose file (docker-compose.yaml) and copy below snippet.

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

Then, we could build the Docker
docker-compose up --build --force-recreate -d

To confirm the containers are up, we need to check the status.

Docker Desktop
Docker container status

Command

╰─➤  docker-compose ps
      Name                   Command            State                      Ports
---------------------------------------------------------------------------------------------------
kafka_kafka_1       /etc/confluent/docker/run   Up      0.0.0.0:29092->29092/tcp, 9092/tcp
kafka_zookeeper_1   /etc/confluent/docker/run   Up      0.0.0.0:22181->2181/tcp, 2888/tcp, 3888/tcp
Enter fullscreen mode Exit fullscreen mode

Okay, now it shows the container is successfully built and running. Next step, how to create a topic and send the message?

Kafka producer (Python)

Create a topic

from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient({"bootstrap.servers": "localhost:29092"})

topic_name = "my_first_topic"
num_partitions = 3

new_topic = NewTopic(topic_name, num_partitions)

try:
    admin_client.create_topics([new_topic])
    print(f"{topic_name} is created!!")
except:
    print(f"{topic_name} is not created!!")
Enter fullscreen mode Exit fullscreen mode

I am using confluent_kafka to integrate with Kafka server.

Also, we can define the number of partitions for the topic. Important to know that number of partitions cannot be changed after the topic has been created.

If you have a topic with 10 partitions, you can have up to 10 consumers processing messages from that topic in parallel, each handling messages from its assigned partition.

╰─➤  python kafka_topic.py
my_first_topic is created!!
Enter fullscreen mode Exit fullscreen mode

Okay, the topic has been created!

Note: If you first forgot to create a topic, no worries. It should create when you post the message to the new topic.

Post a message

import json
import uuid
from confluent_kafka import Producer


def post_message(producer, data):
    try:
        producer.produce("my_first_topic", json.dumps(data).encode("utf-8"))
        producer.flush()
        print(f"message posted!! --> {data['comment']}")
    except:
        print("failed to post message")


data = {
    "user_session_id": str(uuid.uuid4()),
    "user_name": "John Doe",
    "comment": "Malaysia Boleh!",
}

producer = Producer({"bootstrap.servers": "localhost:29092"})
post_message(producer, data)
Enter fullscreen mode Exit fullscreen mode

When you run the Python script, it could post the message

╰─➤  python producer.py
message posted!! --> Malaysia Boleh!
Enter fullscreen mode Exit fullscreen mode

Kafka consumer (Python)

Okay, let's move to the consumer to check whether the data is really posted.

from confluent_kafka import Consumer

consumer = Consumer(
    {
        "bootstrap.servers": "localhost:29092",
        "group.id": "python-consumer",
        "auto.offset.reset": "earliest",
    }
)

consumer.subscribe(["my_first_topic"])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Error: {}".format(msg.error()))
        continue
    data = msg.value().decode("utf-8")
    topic = msg.topic()
    ts = msg.timestamp()
    print(data, topic, ts)
Enter fullscreen mode Exit fullscreen mode

And, here is the output when you run the script

╰─➤  python consumer.py
{"user_session_id": "5321202b-694a-4d82-9d2b-97174303595e", "user_name": "John Doe", "comment": "Malaysia Boleh!"} my_first_topic (1, 1678561082329)
Enter fullscreen mode Exit fullscreen mode

Here is the demo when you post a message, the consumer can print it out within a second.

Demo

Enjoy ~

Top comments (0)