DEV Community

Santhosh Balasa
Santhosh Balasa

Posted on

Confluent Kafka Python Client

-> Download kafka and untar:

wget http://mirrors.estointernet.in/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz 
tar -xvf ./kafka_2.11-2.1.0.tgz
Enter fullscreen mode Exit fullscreen mode

-> Start Zookeeper service

bin/zookeeper-server-start.sh config/zookeeper.properties
Enter fullscreen mode Exit fullscreen mode

-> Start Kafka service

bin/kafka-server-start.sh config/server.properties
Enter fullscreen mode Exit fullscreen mode

-> Create a topic named obs_parser

 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic obs3-meta --partitions 2 --replication-factor 1
Enter fullscreen mode Exit fullscreen mode

-> Start a Producer to send messages

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic obs3-meta
Enter fullscreen mode Exit fullscreen mode

-> Start a Consumer to receive messages

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic obs3_meta
Enter fullscreen mode Exit fullscreen mode

-> Ingest: To write into the queue:

"""
Routines to write router info into Kafka Message Queue through resource uuid.

"""
from confluent_kafka import Producer
import logging

from settings import kafka_host, kafka_port, kafka_topic

logger = logging.getLogger(__name__)


def ingest(parsed_json):
    def delivery_msg(err, msg):
        if err:
            logger.error(f'obs3-meta kafka message failed delivery: {err}\n')

    p = Producer({'bootstrap.servers': f'{kafka_host}:{kafka_port}'})
    p.poll(0)
    p.produce(kafka_topic, str(msg), callback=delivery_msg)
    p.flush()

Enter fullscreen mode Exit fullscreen mode

-> Egest: To read from the queue:

from confluent_kafka import Consumer, KafkaError
import sys
import logging

from settings import kafka_host, kafka_port, kafka_topic

logger = logging.getLogger(__name__)


def egest(uuid):
    c = Consumer({
        'bootstrap.servers': f'{kafka_host}:{kafka_port}',
        'group.id': 'console-consumer-8436',
        'auto.offset.reset': 'earliest'
    })

    c.subscribe([kafka_topic])

    while True:
        msg = c.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue
        if uuid in msg.value().decode('utf-8'):
            parsed_json = msg_value().decode('utf-8')
            break
    c.close()


if __name__ == '__main__':
    try:
        uuid = 'f70ab2e6-ca07-43b4-9586-de1c9fb45584'
        egest(uuid)
    except Exception as e:
        logger.error(f'Processor exit with: {e}', exc_info=True)
        sys.exit(1)  # exit with error
Enter fullscreen mode Exit fullscreen mode

Top comments (0)