DEV Community

Cover image for Notes on Kafka: Kafka CLI Commands
Eden Jose
Eden Jose

Posted on • Updated on

Notes on Kafka: Kafka CLI Commands

These section contains useful commands in running Kafka. note that I encountered some errors while playing around with the commands. I've included the errors at the bottom and also how I fixed them. The contents of this section are:

For a quick-read cheat sheet, you may also jump ahead to the bottom of this page, to the Cheat Sheet section.


kafka-topics

We initially installed v2.7.0 in our machine when we discussed Kafka installation on EC2, but the following commands is helpful if you'll be working on Kafka inrastructure that's been built by others. The latest release and the current stable version as of this writing is v2.8.0.

To check the version

[root@hcptstkafka1 kafka]# kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
Enter fullscreen mode Exit fullscreen mode

Note that you must have have both Zookeeper and Kafka running before creating a topic. You may check out the previous write-up for a single-node (single-broker) Kafka cluster, Kafka on EC2. I highly suggest you go through the previous one since this article build upon the knowledge gain from the previous article.

Similarly, you can open three terminals. Start the Zookeeper first in the first terminal, and then Kafka on the second terminal using the command below. Then all other operations you can do on the third terminal.

# On terminal 1
zookeeper-server-start.sh /$KAFKA-HOME/zookeeper.properties

# On terminal 2
kafka-server-start.sh /$KAFKA-HOME/server.properties
Enter fullscreen mode Exit fullscreen mode

To create a topic, we need to specify:

  • kafka-topics.sh
  • --create --topic, followed by the topic name
  • --partition, followed by how many partition you want,
  • --replication-factor, followed by a number which should be equal or less than the number of brokers.
  • --zookeeper, followed by local machine's and port 2181
  • as of v2.2, we use --bootstrap-server which runs on port 9092 instead.

In the example below, we're creating two topics using the deprecated way and the new approach. Note that localhost and 127.0.0.1 can be used interchangeably.

[root@hcptstkafka1 ~]# kafka-topics.sh \
--topic test-topic-1 --create \
--partitions 3 --replication-factor 1 \
--zookeeper 127.0.0.1:2181

Created topic test-topic-1.

[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-2 \
--partitions 4 --replication-factor 1 \
--bootstrap-server 127.0.0.1:9092

Created topic test-topic-2.

[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-3 \
--partitions 5 --replication-factor 1 \
--zookeeper localhost:2181

Created topic test-topic-3.
Enter fullscreen mode Exit fullscreen mode

To list the topics,

[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092

test-topic-1
test-topic-2
test-topic-3

[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--zookeeper localhost:2181

test-topic-1
test-topic-2
test-topic-3
Enter fullscreen mode Exit fullscreen mode

To delete a topic, run the command/s below. Note that when you run the --delete with the new approach, no output is returned.

On the other hand, using the deprecated method with the --delete will return a message saying that the topic is is marked for deletion.

Either methods will work and the topics deleted will not appear anymore when you try to list the topics again. Let's try to delete test-topic-2 and test-topic-3

[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-2 \
--zookeeper localhost:2181

Topic test-topic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hcptstkafka1 ~]# 
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-3 \
--bootstrap-server localhost:9092
[root@hcptstkafka1 ~]# 
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092

test-topic-1
Enter fullscreen mode Exit fullscreen mode

To get more details about a certain topic,

[root@hcptstkafka1 ~]# kafka-topics.sh \
--describe --topic test-topic-1 \
--bootstrap-server localhost:9092

Topic: test-topic-1     PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-topic-1     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-1     Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-1     Partition: 2    Leader: 0       Replicas: 0     Isr: 0
Enter fullscreen mode Exit fullscreen mode

Let's create two more example topics.

[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-eden-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092

Created topic test-eden-1.

[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-tina-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092

Created topic test-tina-1.
Enter fullscreen mode Exit fullscreen mode

Going back to the our $KAFKA-HOME folder, which is where are Kafka package was previously installed, we can check the new logs created in the data/kafka folder.

Here we can see logs were created for the first topic, test-topic-1 and for the two new topics. test-eden-1 and test-tina-1. Note that each partition will also have a log.

Since we set three partitions for each topic, and there are three topics in total, we should see 9 new logs.

[root@hcptstkafka1 ~]# cd /usr/local/bin/kafka/data/kafka/
[root@hcptstkafka1 kafka]# ll
total 20
-rw-r--r-- 1 root root   4 Aug 15 07:25 cleaner-offset-checkpoint       
-rw-r--r-- 1 root root   4 Aug 15 07:38 log-start-offset-checkpoint     
-rw-r--r-- 1 root root  88 Aug 15 06:13 meta.properties
-rw-r--r-- 1 root root 151 Aug 15 07:38 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 151 Aug 15 07:39 replication-offset-checkpoint   
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-2
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-2
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-0
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-1
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-2
[root@hcptstkafka1 kafka]# 
[root@hcptstkafka1 kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
test-eden-1
test-tina-1
test-topic-1
Enter fullscreen mode Exit fullscreen mode

kafka-console-producer

To run this command, the following is required

  • specify brokers using broker-list - deprecated
  • as replacement to brokers-list, specify bootstrap-server
  • specify topic using topic

When you run the command with the required parameters, it should return a caret (>). You can now send a message to the topic. To exit, hit Ctrl-c.


[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-1
>Hello world!
>This is the first message that we sent to a topic!                               D                   D           
>^Z

[root@hcptstkafka1 ~] kafka-console-producer.sh \
--bootstrap-server localhost:9092
>Awesome job on this!
>Alright!
>^Z
Enter fullscreen mode Exit fullscreen mode

What if I sent a message to a non-existing topic

When you produce a message and specified a topic that is still not create, it will return an error saying "LEADER_NOT_AVAILABLE". You'll still be allowed to type in a message until you exit out.

What happens here is the producer tries to send a message to a non-existing topic but was greeted with a warning. However, the producer is smart enough to recover and wait for the Kafka topic to be created. Once the topic is created, you can continue sending messages to the now-created topic.

[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 --topic test-topic-10
> Third set of messages!
[2021-06-29 19:57:39,480] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-10=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Wait there's error but I'm still able to type in a message
>^Z

# the new topic should now appear when you try to list the topics
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

test-topic-1
test-topic-10
Enter fullscreen mode Exit fullscreen mode

The topic is created with the default values. You can see when you try to run describe.

[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-10 \
--bootstrap-server localhost:9092

Topic: test-topic-10    PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-topic-10    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Enter fullscreen mode Exit fullscreen mode

The topic will default to 1 partition and a replication factor of 1. This isn't best practice thus it is recommended to ensure that the destination topics are created.

You can also change this default value by editing the config/server.properties

[root@hcptstkafka1 ~] pwd
/usr/local/bin/kafka
[root@hcptstkafka1 kafka]
[root@hcptstkafka1 kafka] vi config/server.properties 

## Some of the output omitted ##

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/bin/kafka/data/kafka

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=4
Enter fullscreen mode Exit fullscreen mode

Whenever we edit the properties file, we'll need to stop the Kafka service and then run it again for the changes to take effect. On the terminal where kafka-server-start.sh was run, hit Ctrl-C then run the command again.

Now if we try to send a message again to a non-existing, the topic will be created with the new default value.

# Send the message
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-100
>Another message sent!
[2021-06-29 20:12:57,142] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-100=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Oops! But hey, still sending message!
>^Z

# List the topics and get the details of the newly-created topic.
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
test-topic-100
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-100 \
--bootstrap-server localhost:9092 

Topic: test-topic-100   PartitionCount: 4       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-topic-100   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-100   Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-100   Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-100   Partition: 3    Leader: 0       Replicas: 0     Isr: 0
Enter fullscreen mode Exit fullscreen mode

Recall that producers can choose to accept acknowledgements from the topic that it has received the message. This is done through the acks.

We'll send a message again but this time we specify that we want confirmation from the topic.

[root@hcptstkafka1 ~] kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-1 --producer-property acks=all
> This is my second batch of message sent to the topic.
>I should get confirmation from the topic
Enter fullscreen mode Exit fullscreen mode

kafka-console-consumer

Here we'll verify the messages sent to the topics. Similar with the producer, you also need to specify some parameters:

  • bootstrap-server
  • topic

However, it will only read the real-time messages that was sent AFTER kafka-console-consumer is ran.

If we try to run kafka-console-consumer and then run kafka-console-producer on another window and send a message again to the topic, the console consumer will now be able to see it in real-time.

# In producer's window
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-10
>Sending a message to the second topic - test-topic-10!
>yeahboii!
>over and out!

# In consumer's window.
# Note that each line of message arrives in real-time.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:90192 \
--topic test-topic-10                                                         sr: 0
Sending a message to the second topic - test-topic-10!                           sr: 0
yeahboii!
over and out!
Enter fullscreen mode Exit fullscreen mode

To read the total messages sent to the topic, we can append --from-beginning. To see the messages we sent to test-topic-1 earlier,

[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--from-beginning

Alright!
I should get confirmation from the topic
This is the first message that we sent to a topic!
This is my second batch of message sent to the topic.
Enter fullscreen mode Exit fullscreen mode

Note that when you get the messages, it is not retrieved in order. Order is not guaranteed if you have multiple partitions. This is because when the producer send messages to the topic, the messages are distributed across the partitions.

You can have a guaranteed ordering of messages retrieved if you have a single-partition topic.


Reading Messages from specific partitions and offsets

We can also specify the partition that we want to read by adding the --partition parameter followed the partition number.

Let's first produce some messages to the test-eden-1 topic

[root@hcptstkafka1 ~]# kafka-console-producer.sh \
--topic test-eden-1 \
--bootstrap-server localhost:9092

>This topic will contain a list of names
>Barney
>Marshall
>Ted
>Robin
>Lily
>Sheldon
>Rajesh
>Penny
>Leonard
>Howard
>Bernadette
>Amy
Enter fullscreen mode Exit fullscreen mode

Now let's open a second terminal and read the messages, starting from the beginning. Again, the messages will not appear in the same order that they were sent.

[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-eden-1 \
--from-beginning \
--bootstrap-server localhost:9092

This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy

Barney
Ted
Sheldon
Penny
Bernadette
Robin
Howard
Enter fullscreen mode Exit fullscreen mode

This topic was created with 3 partitions. If we want to view only the messages that went to partition 1, we can do the following:

[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--from-beginning \
--bootstrap-server localhost:9092

This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
Enter fullscreen mode Exit fullscreen mode

Similarly, we can read the messages in the same partition but starting from a particular offset. Note that if you want to specify an offset, remove the --from-beginning parameter.

[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--offset 2\
--bootstrap-server localhost:9092

Lily
Rajesh
Leonard
Amy
Enter fullscreen mode Exit fullscreen mode

QUESTION: Can I read messages starting from an offset and do this on different partitions?
If we try query all partitions and display all the messages beginning from a specific offset, we will get an error.

[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--offset 2\
--bootstrap-server localhost:9092

The partition is required when offset is specified.
Enter fullscreen mode Exit fullscreen mode

Running Multiple Consumers

Let's now try to run a producer and two consumers. Let's create test-topic-10 for this example.

# Create new topic
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create \
--topic test-topic-10 \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
Enter fullscreen mode Exit fullscreen mode

On the first and second terminal, run the consumer 1 and consumer 2:

# Runs consumer 1 on terminal 1
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode
# Runs consumer 2 on terminal 2
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

On the third terminal, run the producer. Then start sending messages.

[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
>10
>20
>30
>40
>50
>60
Enter fullscreen mode Exit fullscreen mode

Go back to the first two terminals. You'll see the messages coming in.

Alt Text


Consumer Group

If we have multiple producers sending messages to a single consumer, the rate of incoming messages may overwhelm the consumer and the consumer may not be able to keep up and fall behind. To resolve this, we can split the data between two consumers that are both subscribed to the same topic.

We've done this previously, but this time we'll utilize Consumer groups. But what exactly is the difference between using multiple consumers vs. using consumer groups?

Using multiple consumers
As we've seen previously, two consumers subscribed to the same topic we'll receive the same messages that's being sent by the producers. It's not really doing any parallel consumption since each consumer is treated as an entirely different application.

Using consumer groups
As we will see, the messages that arrives at the consumer groups are load-balanced between the consumers. Consumers registered to the same consumer group will have the same group id.

Ideally, the number of consumers within a consumer group should be equal to the number of partitions that a topic has.

Alt Text

Photo taken from the Kafka: The Definitive Guide book

If there are more consumers than the existing partitions, the excess consumers will go idle and will just waste resources.

Alt Text

Photo taken from the Kafka: The Definitive Guide book

How does Kafka distribute the partitions among the consumers?
At the beginning, a consumer group subscribes to the topic then Kafka "maps" the topic to the group-id of the consumer group. Kafka then checks if any of the existing consumers have the mapped group-id.

If there are two consumers with the same group-id, meaning both are part of the same consumer group, then the topic's partitions are balanced between the two. Note that these consumers cannot have the same assigned partitions.

Alt Text

Photo taken from How can Kafka consumers parallelise beyond the number of partitions

It is also possible to have multiple consumer groups subscribed to the same topic. This is the case if a second consumer group is performing tasks that are different with the first consumer groups.

Alt Text

Photo taken from How can Kafka consumers parallelise beyond the number of partitions

To register consumers to a consumer group, we can use the parameter --group when we run consumers.

As mentioned, if we have multiple consumers in a consumer group retrieving messages from a topic, the retrieved messages will be load-balanced between the two consumers. This means some messages will go to consumer 1 while some will go to consumer 2.

To prove this, open three terminals: one for the producer and two for the consumers. First, ran the kafka-console-consumerwith --group on both consumers.

# Run  command in two terminals for the consumers
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group

# Run command in terminal for producer
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 
Enter fullscreen mode Exit fullscreen mode

Then on the producer terminal, run kafka-console-producer and begin sending messages. You'll see that messages will be distributed between the two consumers.

Alt Text

Now, recall that when a consumer retrieves a message from the topic, it commits the offset. When you use --from-beginning with the kafka-console-consumer and specified the --group, you'll be able to see all of the messages sent to the topic.

If you try running the same command again with the --from-beginning, it will return nothing. This is because the first time this command was run, all the messages from the start have been committed.

[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1  \
--group my-awesome-app-group \
--from-beginning

let try sending messages! which consumer will receive this color?
consumer will receive this color?
red
orange
yellow
green 
blue  

# When you rerun the command again, it returns nothing.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1  \
--group my-awesome-app-group \
--from-beginning


Enter fullscreen mode Exit fullscreen mode

kafka-consumer-groups

These commands can be used to list, create, and basically do a lot of stuff with consumer groups. To list the consumer groups,

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --list

my-awesome-app-group
Enter fullscreen mode Exit fullscreen mode

To get the details of the consumer groups, append with --describe and specify the group.

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe 

Consumer group 'my-awesome-app-group' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-awesome-app-group test-topic-1    2          9               11              2               -               -               -
my-awesome-app-group test-topic-1    1          10              10              0               -               -               -
my-awesome-app-group test-topic-1    0          19              20              1               -               -               -
Enter fullscreen mode Exit fullscreen mode

Notice the column for lag. This specifies how many messages in the specific partition that are still not retrieved and processed by the consumer group. To make sure the consumer group is caught up with all the messages, run the kafka-console-consumer again.


[root@hcptstkafka1 kafka] kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1

violet
white 
black
Enter fullscreen mode Exit fullscreen mode

Now when you try to ran the describe on the consumer group, the lag will now be zero.

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe 

Consumer group 'my-awesome-app-group' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-awesome-app-group test-topic-1    2          11              11              0               -               -               -
my-awesome-app-group test-topic-1    1          10              10              0               -               -               -
my-awesome-app-group test-topic-1    0          20              20              0               -               -               -
Enter fullscreen mode Exit fullscreen mode

Resetting offsets

As previously mentioned, running the kafka-console-consumer.sh and specifying the group will read from the offset - which is the last position in the partition where it left off.

There's actually an option to reset the offset and specify where we want in the partition/s to start reading data, which is in a way replaying the data.

When we ran just kafka-consumer-group command without any parameter, it will not push through and instead will return the available parameters that you can append to the command.

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh 

# Some parts of the output omitted
--reset-offsets                         Reset offsets of consumer group.
                                          Supports one consumer group at the
                                          time, and instances should be
                                          inactive
                                        Has 2 execution options: --dry-run
                                          (the default) to plan which offsets
                                          to reset, and --execute to update
                                          the offsets. Additionally, the --
                                          export option is used to export the
                                          results to a CSV format.
                                        You must choose one of the following
                                          reset specifications: --to-datetime,
                                          --by-period, --to-earliest, --to-
                                          latest, --shift-by, --from-file, --
                                          to-current.
                                        To define the scope use --all-topics
                                          or --topic. One scope must be
                                          specified unless you use '--from-
                                          file'.
Enter fullscreen mode Exit fullscreen mode

Let's try to reset the offset to the beginning by using --reset-offsets and specify that we want to start at the beginning--to-earliest. We also need to define if we want reset the offset for just one topic or for all topics.

Lastly, we just want a preview/dry-run and not to actually reset the offset so we'll also append --execute

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets --to-earliest \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-awesome-app-group           test-topic-1                   0          0
my-awesome-app-group           test-topic-1                   1          0
my-awesome-app-group           test-topic-1                   2          0
Enter fullscreen mode Exit fullscreen mode

Now if we try to retrieve the messages from test-topic-1, we'll be able to see all the messages from the beginning.

[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group

Alright!
I should get confirmation from the topic
the first
ahh right,
orange
orange
green

This is the first message thatwe sent to a topic!
 This is my second batch of message sent to the topic.
Let's see where this will go
now lets try colors
yellow
red
violet

Hello world!
Awesome job on this!
which consumer
or second
its distributes
red\
green
blue
let try sending messages! which consumer will receive this color?
yellow
blue
white
Enter fullscreen mode Exit fullscreen mode

We can also move the offset forward or backward by using --shift-by and append a positive number to advance forward with the number of steps or a negative number to move backward.

# Moving the offset three steps forward
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 3 \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
my-awesome-app-group           test-topic-1                   0          3
my-awesome-app-group           test-topic-1                   1          3
my-awesome-app-group           test-topic-1                   2          3

# Moving the offset another step.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 1 \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
my-awesome-app-group           test-topic-1                   0          4
my-awesome-app-group           test-topic-1                   1          4
my-awesome-app-group           test-topic-1                   2          4

# Moving the offset three steps backward.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by -3 \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-awesome-app-group           test-topic-1                   0          1
my-awesome-app-group           test-topic-1                   1          1
my-awesome-app-group           test-topic-1                   2          1
Enter fullscreen mode Exit fullscreen mode

Troubleshooting

Kafka Socket server failed - Bind Exception
This means a port conflict issue.

ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use
Enter fullscreen mode Exit fullscreen mode

To resolve, you can either run KafkaServer id=0 on a different port (like 9093) or find the processing listening to the port and kill it (easiest).

To find the PID, you can either use netstat or lsof.

[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
tcp6       0      0 :::9092                 :::*                    LISTEN      23999/java
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
java    23999 root  134u  IPv6  52852      0t0  TCP *:XmlIpcRegSvc (LISTEN)

# send a kill signal
[root@hcptstkafka1 kafka]# kill -9 23999

# process should now disappear
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092

# retry running kafka
[root@hcptstkafka1 kafka]# kafka-server-start.sh config/server.properties 
Enter fullscreen mode Exit fullscreen mode

Other CLI options

We've discussed that messages will be randomly distributed to the partitions when the producer sends them. A way to ensure that messages will always go to a specific partition through the use of message keys. This can also be utilized by consumers to retrieve messages from a specific partition.

Producer with keys

kafka-console-producer  \
--broker-list 127.0.0.1:9092  \
--topic first_topic  \
--property parse.key=true  \
--property key.separator=,
> key,value
> another key,another value
Enter fullscreen mode Exit fullscreen mode

Consumer with keys

kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic --from-beginning  \
--property print.key=true  \
--property key.separator=,
Enter fullscreen mode Exit fullscreen mode

Cheat Sheet

This simple and easy cheat sheet of commands is from Bogdan Stashchuk's Github page for his Apache Kafka Course.

Basic KAFKA Commands

START ZOOKEEPER
bin/zookeeper-server-start.sh config/zookeeper.properties

START KAFKA BROKER
bin/kafka-server-start.sh config/server.properties

CREATE TOPIC
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--replication-factor 1 \
--partitions 3 \
--topic test

LIST TOPICS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

TOPIC DETAILS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic test

START CONSOLE PRODUCER
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test

START CONSOLE CONSUMER
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test

START CONSOLE CONSUMER AND READ MESSAGES FROM BEGINNING
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning

START CONSOLE CONSUMER WITH SPECIFIC CONSUMER GROUP
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--group test \
--from-beginning

LIST CONSUMER GROUPS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list

CONSUMER GROUP DETAILS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group test \
--describe
Enter fullscreen mode Exit fullscreen mode

Resources

I know this write-up is lengthy and I'll probably keep coming back and read this again, but if you're interested to learn more, all my notes are based on the following courses.

Besides this, you can also try out another open-source alternative of Kafka CLI, KafkaCat. This one I might try once I'm done with the course.


If you've enjoyed my notes or if they somehow brought some value, I'll be glad to connect with you on Twitter!. 😃


Top comments (1)

Collapse
 
nyangweso profile image
Rodgers Nyangweso

awesome content, kudos