Let's create a simple mini project to interact with Kafka using Python.
We will build a simple message Producer and Consumer. Mini projects like this are helpful when you quickly want to test some behaviour of Kafka. So let's get started.
First off, let's spin up all the required components using this docker-compose.yml. It will start Zookeeper, Kafka and Kafdrop containers in your system.
Let's create our message Producer now.
We need to import KafkaProducer
from the kafka library.
We have to specify the address of our Kafka server (which we created in the above step) while creating a KafkaProducer
.
We need to pass these minimum parameters in the producer.send()
method:
-
Topic name : we will use the name
my_test_topic
- value : the message itself
- key (optional) : the key of the message
from kafka import KafkaProducer
bootstrap_servers = ['localhost:29092']
topic_name = 'my_test_topic'
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
future = producer.send(topic_name, key=b'1', value=b'This is message 1')
metadata = future.get()
print('message sent successfully on topic %s partition %s' % (metadata.topic, metadata.partition))
When we run our Producer, the specified Kafka topic (my_test_topic
) gets created automatically and the message is sent to the topic.
Let's create the Consumer now. We need the server and topic name as above.
from kafka import KafkaConsumer
import sys
bootstrap_servers = ['localhost:29092']
topic_name = 'my_test_topic'
consumer = KafkaConsumer(
topic_name,
bootstrap_servers = bootstrap_servers,
auto_offset_reset = 'earliest',
enable_auto_commit = True,
group_id = 'my-group-1'
)
try:
for message in consumer:
print ("Message received - %s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
except KeyboardInterrupt:
sys.exit()
Run the Consumer and it will receive the message sent by the Producer.
Thanks for reading! The source code is available in this GitHub repo.
Top comments (0)