DEV Community

Santhosh Thomas
Santhosh Thomas

Posted on • Updated on

Create And Delete Kafka Topic Using Python

Install kafka-python package

Python client for the Apache Kafka distributed stream processing system.

pip install kafka-python
Enter fullscreen mode Exit fullscreen mode
from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(bootstrap_servers=[ipaddress:port])


topic_names = ['topic1', 'topic2', 'topic3' , 'topic3']

def create_topics(topic_names):

    existing_topic_list = consumer.topics()
    print(list(consumer.topics()))
    topic_list = []
    for topic in topic_names:
        if topic not in existing_topic_list:
            print('Topic : {} added '.format(topic))
            topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=3))
        else:
            print('Topic : {topic} already exist ')
    try:
        if topic_list:
            admin_client.create_topics(new_topics=topic_list, validate_only=False)
            print("Topic Created Successfully")
        else:
            print("Topic Exist")
    except TopicAlreadyExistsError as e:
        print("Topic Already Exist")
    except  Exception as e:
        print(e)

def delete_topics(topic_names):
    try:
        admin_client.delete_topics(topics=topic_names)
        print("Topic Deleted Successfully")
    except UnknownTopicOrPartitionError as e:
        print("Topic Doesn't Exist")
    except  Exception as e:
        print(e)


consumer = KafkaConsumer(
    bootstrap_servers = "ip_address",
    )
create_topics(topic_names)
Enter fullscreen mode Exit fullscreen mode

Discussion (3)

Collapse
redreed profile image
redreed

Nice snip but not sure if u missed it but u didnt initialize consumer and didnt assign existing_topic_list to receive the topics from consumer

Collapse
sats268842 profile image
Santhosh Thomas Author • Edited on

The code snippet has been modified.

Collapse
sats268842 profile image
Santhosh Thomas Author

Sorry for missing that code snippet.