DEV Community

chauhoangminhnguyen
chauhoangminhnguyen

Posted on • Originally published at howtodevez.blogspot.com

Using Kafka with Docker and NodeJS

Introduction to Kafka

Kafka is an open-source, distributed messaging system that functions on a publish/subscribe model. It is widely used by numerous large companies for high-performance, real-time data streaming.

Developed by LinkedIn since 2011, Kafka has grown into the most popular distributed streaming platform. It can handle vast amounts of records with high efficiency.

NodeJS Kafka

Advantages of Kafka

  • Open-source: Freely available and continuously improved by a large community.
  • High-throughput, high-frequency: Capable of processing large volumes of data across topics continuously.
  • Automatic message storage: Allows for easy message retrieval and verification.
  • Large user community: Offers extensive support and shared resources.

Basic Concepts

If you're new to Kafka and Message Queues, here are some key concepts to understand:

  • Producer: Creates and sends data to the Kafka server, where data is sent as messages in byte array format.
  • Consumer: One or more consumers subscribe to a topic to receive messages when the producer sends data to the Kafka server.
  • Consumer Group: A collection of consumers within the same group that share the task of processing messages.
  • Topic: Used to receive data sent from the producer, with consumers fetching message data from the topic.
  • Broker: Acts as an intermediary that exchanges data between the producer and consumer.
  • Cluster: A Kafka cluster consists of multiple servers, each known as a broker.
  • ZooKeeper: A server used to manage brokers. Kafka now supports kraft mode, allowing usage without ZooKeeper.
  • Partition: When multiple messages are sent to a topic simultaneously, they are distributed across different partitions on various Kafka servers within the same cluster.

Using Kafka with Docker

First, create a docker-compose.yml file with the following content:

version: "3"
services:
  kafka:
    image: bitnami/kafka
    ports:
      - 9092:9092
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,EXTERNAL://kafka:9094
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
Enter fullscreen mode Exit fullscreen mode

Hereโ€™s the configuration file for starting Kafka in kraft mode, so Zookeeper isn't needed.

Next, run the image:

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Example of sending a message in Kafka

To access the Kafka container:

docker exec -it kafka bash
Enter fullscreen mode Exit fullscreen mode

Create a new topic with the following command:

kafka-topics.sh --bootstrap-server {kafka host}:{port} --create --topic {topic name}
Enter fullscreen mode Exit fullscreen mode

Kafka topic

Use the producer to send messages to the newly created topic:

kafka-console-producer.sh --topic {topic name} --bootstrap-server {kafka host}:{port}
Enter fullscreen mode Exit fullscreen mode

Kafka console producer

Use the consumer to receive messages from the topic:

kafka-console-consumer.sh --topic {topic name} --from-beginning --bootstrap-server {kafka host}:{port}
Enter fullscreen mode Exit fullscreen mode

Docker exec

Connecting to Kafka with NodeJS

First, install the necessary package to connect to Kafka in NodeJS:

yarn add kafkajs
Enter fullscreen mode Exit fullscreen mode

I'll also provide an example to make it easier for you to use:

import {Kafka} from 'kafkajs'

const main = async (): Promise<void> => {
  const brokers = ['localhost:9092'],
    groupId = 'group-id-value',
    topic = 'topic-name'

  const getMessage = (length: number) => Array.from({length}).map(() => ({value: 'message value ' + Date.now()}))

  const kafka = new Kafka({
    brokers,
    connectionTimeout: 1000,
    authenticationTimeout: 1000,
    reauthenticationThreshold: 3000,
  })

  const producer = kafka.producer()
  const consumer = kafka.consumer({groupId})

  // Producing
  await producer.connect()
  await producer.send({
    topic,
    messages: getMessage(3),
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({topic, fromBeginning: true})

  await consumer.run({
    // only execute eachMessage or eachBathch once
    eachMessage: async ({topic, partition, message}) => {
      console.log({
        topic,
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },

    // uncomment to test receive multiple message in the same time
    // eachBatch: async ({batch}) => {
    //   const topic = batch.topic,
    //     partition = batch.partition,
    //     messages = batch.messages
    //   messages.forEach(message => {
    //     console.log({topic, partition, timestamp: message.timestamp, key: message.key, value: message.value.toString()})
    //   })
    // },
  })
}

main()
Enter fullscreen mode Exit fullscreen mode

In the consumer.run function, the eachMessage field processes one message at a time as it's received, while the eachBatch field is used to handle multiple messages sent simultaneously.

NodeJS connect Kafka

Your likes, shares, and comments mean the world to me. Let's spread the word together!


If you found this content helpful, please visit the original article on my blog to support the author and explore more interesting content.

BlogspotBlogspotDev.toFacebookX


Some series you might find interesting:

Top comments (0)