This post was originally published on my personal blog.
Apache Kafka is a distributed streaming platform that can act as a message broker, as the heart of a stream processing pipeline, or even as the backbone of an enterprise data synchronization system. Kafka is not only a highly-available and fault-tolerant system; it also handles vastly higher throughput compared to other message brokers such as RabbitMQ or ActiveMQ.
In this tutorial, you will install Apache Kafka, run three brokers in a cluster, and learn how to produce and consume messages from your cluster. This tutorial assumes that you have no existing Kafka or ZooKeeper installation.
To complete this tutorial, you will need:
- A UNIX environment (Mac or Linux)
- Java 8+ installed
Note: Java 7 support was dropped in 2.0.0. Java 11 support was added in 2.1.0.
Download Apache Kafka and its related binaries from the Apache Kafka website. At the time of this article, the latest version is Apache Kafka 2.1.1. After downloading from the previous link, extract the
.tgz file from the location it was downloaded to.
tar -xzf kafka_2.11-2.1.0.tgz cd kafka_2.11-2.1.0
Let's take a look at the architecture of a simple Apache Kafka setup.
- Kafka Cluster: A group of Kafka brokers forming a distributed system
- Kafka Broker: An instance of Kafka that holds topics of data
- ZooKeeper: A centralized system for storing and managing configuration
- Producer: A client that sends messages to a Kafka topic
- Consumer: A client that read messages from a Kafka topic
Kafka utilizes ZooKeeper to manage and coordinate brokers within a cluster. Producers and consumers are the main components that interact with Kafka, which we'll take a look at once we have a running Kafka broker. In this tutorial, we'll be running three Kafka brokers and one ZooKeeper node.
The above diagram shows the architecture of the systems and tools used in this tutorial. It helps demonstrate how Kafka brokers utilize ZooKeeper, which components the command line tools we'll be using interact with, and shows the ports of the running services.
ZooKeeper is a centralized service that is used to maintain naming and configuration data as well as to provide flexible and robust synchronization within distributed systems. Kafka requires ZooKeeper, so we must start an instance of ZooKeeper before we start Kafka.
Conveniently, the download for Apache Kafka includes an easy way to run a ZooKeeper instance. Inside of the
bin directory, there is a file named
zookeeper-server-start.sh. To start ZooKeeper, run the following command from the root directory of your download:
In your terminal, ZooKeeper logs will start flowing and you will shortly see a line that states ZooKeeper is running on port
2181. This is ZooKeeper's default port, and can be changed in
Note: The default directory where ZooKeeper stores its state is set to
/tmp/zookeeper. If you restart your machine, all ZooKeeper data will be lost.
Lastly, open a new terminal window and let ZooKeeper continue running in your original terminal. Ensure you
cd to the root directory of your extracted Kafka download.
The official Kafka quick start guide only runs one broker – that's not really a distributed system or a cluster; so we're going to run three brokers! :)
Let's examine the configuration file for a Kafka broker located at
config/server.properties. You can view the configuration file from your new terminal window by running:
There's quite a bit of configuration, but the main properties we care about are the following:
broker.id=0: the unique id of the broker
listeners=PLAINTEXT://:9092: the protocol and port of the broker
logs.dir=/tmp/kafka: the storage location for data in the broker
All three of these configuration properties must be unique per broker. By default, you can see the default broker id is
0 and the default Kafka port is
9092. Since we're going to start 3 brokers, let's copy this file for each broker and leave
server.properties as-is for reference. We can do this by running:
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties cp config/server.properties config/server-3.properties
Next, we need to modify the properties listed above to be unique per broker. You'll want to ensure you uncomment the
listeners property. Modify the files using your favorite text editor, or via a CLI program such as
vim. Make sure to only modify the lines below, and not to replace the whole file with them!
broker.id=1 listeners=PLAINTEXT://:9091 log.dirs=/tmp/kafka-1
broker.id=2 listeners=PLAINTEXT://:9092 log.dirs=/tmp/kafka-2
broker.id=3 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-3
Yay! We now have a configuration file for each broker. Each broker has a unique id, listens on a unique port, and stores data in a unique location.
Note: As with ZooKeeper, the data is stored in the
/tmpdirectory. All data will be lost when you restart your machine.
In addition to your current terminal, open two more terminal windows and
cd to your Kafka download directory. You should have four terminals open at this point; one running ZooKeeper and three for running Kafka.
To start Kafka, you'll want to run the following commands, with each one in a separate terminal:
You'll start to see logs in each terminal for the brokers you started. If you look at your ZooKeeper terminal, you'll also see logs from the brokers connecting to ZooKeeper. Each terminal should end with a line similar to:
[2019-03-02 15:28:21,074] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
Congrats! You now have a Kafka cluster running, with a unique broker exposed on ports
Now that we have a Kafka cluster running, let's send some messages! To do this, we must first create a topic. Kafka includes some command line tools to do this, located in the
bin directory. Open a new terminal window and
cd to the Kafka download directory.
Let's create a topic named
test. We can do this by utilizing the
kafka-topics.sh script in the
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
Let's analyze the arguments we're passing the script:
--create: flag to create a topic
--zookeeper: pass the zookeeper connect utilized by Kafka
--replication-factor: set the replication factor
--partitions: set the number of partitions
--topic: set the topic name
In the command above, we create a single partition topic. We also set the replication factor to
3. This means that data will be replicated (copied for redundancy) to all of our brokers.
Note: The max replication factor for a topic is the number of brokers you have running. In this case, we have max replication factor of 3.
We can now
describe the topic to gain insight into our newly created topic:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
This will output something similar to:
Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
This explains that our topic
test has one partition, a replication factor of three, and no non-default configurations set. It also shows for our one partition, partition
0, that the leader is broker
2 and that we have
3 in-sync replicas. Your leader may be different than broker
2, but you should have
3 in-sync replicas.
To learn more about what partitions, replicas, and in-sync replicas mean, go check out and read my post Apache Kafka: An Introduction.
Now that we have a Kafka topic, let's send some messages to it! We can do this using the
kafka-console-producer.sh script in the
bin directory. This is a handy tool for producing messages from the command line.
Run the console producer with the following command:
bin/kafka-console-producer.sh --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test
We pass the list of Kafka brokers with the
--broker-list argument and the name of the topic to produce to with the
--topic argument. You should now have a terminal line starting with
>. From here, you can type a message and hit enter to send it to Kafka. For example:
> hello world, this is my first message > this is a second message
Once you've sent some messages, exit out of the console producer by using
cmd + c or
ctrl + c.
We've successfully sent some messages to our Kafka topic, so the last thing we need to do is read those messages. We can do this by using the
kafka-console-consumer.sh script in the
bin directory. This is a handy tool for consuming messages from the command line.
Run the console consumer against our topic with the following command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091,localhost:9092,localhost:9093 --topic test --from-beginning
We set the
--bootstrap-server argument to a comma-separated list of our brokers; this can be one or all of the brokers. I typically use all brokers for consistency. We also set the argument
--topic to our topic name and pass the
--from-beginning flag to read all messages in the topic. If you don't pass
--from-beginning, you'll only see messages that have been produced since starting the consumer.
You should see the messages sent earlier appear in the output:
hello world, this is my first message this is a second message
To exit the consumer, use
cmd + c or
ctrl + c.
Congrats! You've successfully started a local Kafka cluster, created a topic, sent messages to it with a console producer, and read messages from it with a console consumer. For fun, you can start the console producer and console consumer in separate terminal windows and produce some more messages. You'd then be able to see messages get consumed and printed in real time! Sweet!
You can stop the Kafka brokers and ZooKeeper node by using
cmd + c or
ctrl + c in their respective terminal windows. I hope this tutorial helped you in getting a local Kafka cluster set up, and now you should be ready to continue on in your Kafka journey!