DEV Community

loading...
Cover image for How to delete records from a Kafka topic

How to delete records from a Kafka topic

de_maric profile image Dejan Maric Originally published at codingharbour.com on ・3 min read

Photo credit: Adli Wahid

Every now and then I get a request from my colleagues who would like to delete some or all the records from a Kafka topic. The request usually comes after someone has produced the wrong data in a test topic while playing around or due to a bug in the producer code. Or simply because they want a clean slate.

Whatever the reason, today I’ll show you a few ways to delete some or all the records from a Kafka topic.

It should go without saying that you should use your best judgment and check (at least) twice before using the methods described below in a production environment.

Kafka-delete-records

This command is available as part of Kafka CLI tools. It requires two parameters:

  • a bootstrap server and
  • a JSON file, describing which records should be deleted.

The command allows you to delete all the records from the beginning of a partition, until the specified offset.

NOTE: It is not possible to delete records in the middle of the topic.

The JSON file specifies one or more partitions from which we want to remove the records. Let’s create delete-records.json file as below:

{
    "partitions": [
        {
            "topic": "my-topic",
            "partition": 0,
            "offset": 3
        }
    ],
    "version": 1
}

Here we’ve specified that for the partition 0 of the topic “my-topic” we want to delete all the records from the beginning until offset 3.

Now we’re ready to delete records. Execute:

kafka-delete-records --bootstrap-server localhost:9092 \
--offset-json-file delete-records.json

After the command finishes the start offset for the partition 0 will be 3.

Deleting all the records in a topic

NOTE: This will not work for compacted topics

If you want to prune all the messages, another option is to reduce the retention of the topic to a small value (e.g. 100ms), wait for the brokers to remove all the records from the topic and then set the topic retention to its original value. Here’s how to do it.

First, set the retention.ms to 100 milliseconds.

kafka-configs --zookeeper localhost:2181 \
--entity-type topics \
--entity-name my-topic \
--alter --add-config retention.ms=100

Then, wait for the brokers to remove the messages with expired retention (that is, all of them). To know if the process is finished, check whether the start offset and end offset are the same. This means there are no more records available on the topic. Depending on your setup, it might take few minutes for Kafka to clean up the topic, so keep checking the start offset.

Use the GetOffsetShell class to check the beginning and ending offset of a topic’s partitions. To check the end offset set parameter time to value -1:

kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic \
--time -1

To check the start offset, use --time -2

kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic \
--time -2

Once the topic has been purged, return the retention.ms to its original value:

kafka-configs --zookeeper localhost:2181 \
--entity-type topics \
--entity-name my-topic \
--alter --add-config retention.ms=<ORIGINAL VALUE>

Delete a topic and create it again

Not as elegant as the previous two approaches, yet it might be an easier solution in some cases (e.g. if topic creation is scripted).

kafka-topics --bootstrap-server localhost:9092 \
--topic my-topic \
--delete

Then create it again:

kafka-topics --bootstrap-server localhost:9092 \
--topic my-topic \
--create \
--partitions <number_of_partitions> \
--replication-factor <replication_factor>

Few things to be aware of when using this approach

Make sure the deletion of topics is enabled in your cluster. Set delete.topic.enable=true. From Kafka 1.0.0 this property is true by default.

Make sure all consumers have stopped consuming the data from the topic you want to delete. Otherwise, they will throw errors like:

Received unknown topic or partition error in fetch for partition my-topic-0

or

Error while fetching metadata with correlation id 123 : {my-topic=LEADER_NOT_AVAILABLE}

One more thing that might happen if you have consumers up and running is that the topic will get auto-created if the cluster-wide property auto.create.topics.enable is true (and by default it is). Not bad per se, but it will use a default number of partitions (1) and a replication factor (1), which might not be what you wanted.

Moral of the story is – make sure to stop your consumers before using this approach 🙂

Would you like to learn more about Kafka?

I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at Coding Harbour.

Discussion

pic
Editor guide