DEV Community

Ng Wai Foong for k6

Posted on • Edited on • Originally published at k6.io

How to Load Test Your Kafka Producers and Consumers using k6

Recently, k6 started supporting k6 extensions to extend k6 capabilities for other cases required by the community. The community has already built plenty of extensions. k6 extensions are written in Go, and many of them are reusing existing Go libraries.

This makes k6 to be a versatile tool to test different protocols and adapt to multiple cases. This post is the third part of my series of articles testing various systems using k6:

Let's look in this post how we test the popular Kafka project. Apache Kafka is a powerful event streaming platform that provides the following features:

  • Write and read streams of events
  • Store streams of events for as long as you want to
  • Process streams of events in parallel retrospectively

It works by having client applications write events to the Kafka server. We term this type of application as Producers. Client applications which read and process events from the Kafka server are called Consumers.

Kafka itself is capable of handling hundreds to millions of events per second seamlessly on simple setup. But what if you wanted to test and observe how your Kafka service behaves before going live?

The xk6-kafka extension provides some convenient functions for interacting with Kafka Producers and Consumers. It serves as a producer that can send a high volume of messages per second, allowing you to monitor the system under test (SUT) and test how the application will keep up with the load.

xk6-kafka

At the time of this writing, the xk6-kafka extension provides the following APIs:

Function Description
consume(reader, limit) Consume messages from the Kafka server.
createTopic(address, topic) Create a new topic.
listTopics(address) Return a unique set of topics.
produce(writer, messages) Produce messages to the Kafka server.
reader(brokers, topic) Instantiate a new Reader instance.
writer(brokers, topic) Instantiate a new Writer instance.

Some of the APIs mentioned above do accept additional optional parameters meant for authentication and message compression. Refer to more examples for additional information.

Building k6 with the kafka extension

By default, k6 does not support testing Kafka. Building k6 with the xk6-kafka extension creates a k6 version with the capabilities to test Kafka producers and consumers.

Make sure you have the following installed and ready before you continue:

  • Go (>=1.7)
  • Git

Next, continue the installation by running the following command in your terminal to install the xk6 module:

go install go.k6.io/xk6/cmd/xk6@latest
Enter fullscreen mode Exit fullscreen mode

Once the command finishes successfully, you can start making your own custom k6 binary for Kafka as follows:

xk6 build --with github.com/mostafa/xk6-kafka@latest
Enter fullscreen mode Exit fullscreen mode

It will take some time for the process to create a new k6 binary in your working directory.

Troubleshooting: If you experienced issues installing xk6 extension, kindly head over to the Github repository to download the pre-compiled binaries instead.

Running Kafka

The recommended approach is to use docker as manual installation is quite complicated and prone to errors. You can pull the following image by lensesio from DockerHub. It contains the complete Kafka setup for development.

docker pull lensesio/fast-data-dev:latest
Enter fullscreen mode Exit fullscreen mode

After that, run the following to start the docker in detached mode:

sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
       -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092  \
       -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

sudo docker logs -f -t lenseio
Enter fullscreen mode Exit fullscreen mode

Visit http://localhost:3030 to get into the fast-data-dev environment.

k6 test

Import

Now, let’s create a new JavaScript file called test_script.js in the same directory as your k6 binary. Then, add the following import statement at the top of the file:

import { check } from "k6";
import { writer, produce, reader, consume, createTopic } from "k6/x/kafka";
Enter fullscreen mode Exit fullscreen mode

Initialization

Continue by appending the following initialization code:

const bootstrapServers = ["localhost:9092"];
const kafkaTopic = "xk6_kafka_json_topic";

const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);
Enter fullscreen mode Exit fullscreen mode

The code will initialize both the writer and reader instances based on the configuration specified. If you are using a different IP/host address and port for your Kafka server, kindly modify it accordingly.

Next, call the createTopic function to create a new topic. Rest assured that this function will do nothing if the topic already exists.

createTopic(bootstrapServers[0], kafkaTopic);
Enter fullscreen mode Exit fullscreen mode

Let’s create a function which generates a random integer as a unique identifier for each message later on. Please be noted that this is optional and not a mandatory requirement to do load testing.

function getRandomInt(max=1000) {
  return Math.floor((Math.random() * max) + 1);
}
Enter fullscreen mode Exit fullscreen mode

Default function

As for the default function, define it as follows:

export default function () {
    let messages = [{
      key: JSON.stringify({
          correlationId: "test-id-sql-" + getRandomInt(),
      }),
      value: JSON.stringify({
          title: "Load Testing SQL Databases with k6",
          url: "https://k6.io/blog/load-testing-sql-databases-with-k6/",
          locale: "en"
      }),
    },
    {
      key: JSON.stringify({
          correlationId: "test-id-redis-" + getRandomInt(),
      }),
      value: JSON.stringify({
          title: "Benchmarking Redis with k6",
          url: "https://k6.io/blog/benchmarking-redis-with-k6/",
          locale: "en"
      }),
  }];

    let error = produce(producer, messages);
    check(error, {
          "is sent": (err) => err == undefined,
    });
}
Enter fullscreen mode Exit fullscreen mode

The code block above works as follows:

  • Initialize a list of messages
  • Call produce function to publish the messages
  • Check if messages are successfully sent

Teardown

Once you are done with it, create a teardown function and close the connections:

export function teardown(data) {
    producer.close();
    consumer.close();
}
Enter fullscreen mode Exit fullscreen mode

Run the test

Save the file and run the following command on your terminal:

./k6 run --vus 50 --duration 5s test_script.js
Enter fullscreen mode Exit fullscreen mode

You should see the following output:

running (05.0s), 00/50 VUs, 15136 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  5s

    ✓ is sent

    █ teardown

    checks.........................: 100.00% ✓ 15136  ✗ 0
    data_received..................: 0 B    0 B/s
    data_sent......................: 0 B    0 B/s
    iteration_duration.............: avg=16.49ms min=31.9µs med=13.52ms max=1.14s p(90)=28.55ms p(95)=36.46ms
    iterations.....................: 15136   3017.4609/s
    kafka.writer.dial.count........: 151    30.102841/s
    kafka.writer.error.count.......: 0      0/s
    kafka.writer.message.bytes.....: 5.2 MB  1.0 MB/s
    kafka.writer.message.count.....: 30272   6034.9218/s
    kafka.writer.rebalance.count...: 0      0/s
    kafka.writer.write.count.......: 30272   6034.9218/s
    vus............................: 5      min=5       max=50
    vus_max........................: 50     min=50      max=50
Enter fullscreen mode Exit fullscreen mode

Scale the load

You can easily scale the load by increasing the number of vus. For example, the following command uses 500 vus to load test for a minute:

./k6 run --vus 500 --duration 1m test_script.js
Enter fullscreen mode Exit fullscreen mode

If you are new to k6, check out how to configure the load options in the script or run a stress test with k6.

Extend the test

The script above is all about producing messages to your Kafka server. In fact, you can easily modify the code into a test which produces and consumes messages.

Simply add the following code below the for loop code:

let result = consume(consumer, 10);
check(result, {
    "10 messages returned": (msgs) => msgs.length == 10,
});
Enter fullscreen mode Exit fullscreen mode

The code will read 10 messages each time. Simply modify the value to something higher if you wish to consume more messages.

The output is as follows when you run it with the same command:

running (05.0s), 00/50 VUs, 9778 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  5s

    ✓ is sent
    ✓ 10 messages returned

    █ teardown

    checks.........................: 100.00% ✓ 19556      ✗ 0
    data_received..................: 0 B    0 B/s
    data_sent......................: 0 B    0 B/s
    iteration_duration.............: avg=25.53ms min=41.4µs med=18ms max=1.41s p(90)=37.73ms p(95)=52.37ms
    iterations.....................: 9778   1946.80798/s
    kafka.reader.dial.count........: 50     9.955042/s
    kafka.reader.error.count.......: 0      0/s
    kafka.reader.fetches.count.....: 101    20.109184/s
    kafka.reader.message.bytes.....: 15 MB   2.9 MB/s
    kafka.reader.message.count.....: 97830   19478.034846/s
    kafka.reader.rebalance.count...: 0      0/s
    kafka.reader.timeouts.count....: 46     9.158638/s
    kafka.writer.dial.count........: 152    30.263327/s
    kafka.writer.error.count.......: 0      0/s
    kafka.writer.message.bytes.....: 3.4 MB  669 kB/s
    kafka.writer.message.count.....: 19556   3893.615961/s
    kafka.writer.rebalance.count...: 0      0/s
    kafka.writer.write.count.......: 19556   3893.615961/s
    vus............................: 50     min=50      max=50
    vus_max........................: 50     min=50      max=50
Enter fullscreen mode Exit fullscreen mode

Kafka metrics in k6

By default, k6 has its own built-in metrics which are collected automatically. Apart from that, you can create your own custom metrics. Custom metrics can be categorized into the following types:

  • Counter: A metric that cumulatively sums added values.
  • Gauge: A metric that stores the min, max and last values added to it.
  • Rate: A metric that tracks the percentage of added values that are non-zero.
  • Trend: A metric that allows for calculating statistics on the added values (min, max, average and percentiles).

Besides k6, k6 extensions can collect metrics and report them as part of the k6 results output. In this case, xk6-kafka collects individual stats for both reader and writer.

Reader

Let’s have a look at the metrics meant for the reader.

Metrics Type Description
kafka.reader.dial.count Counter Total number of times the reader tries to connect to Kafka.
kafka.reader.error.count Counter Total number of errors occurred when reading from Kafka.
kafka.reader.fetches.count Counter Total number of times the reader fetches batches of messages from Kafka.
kafka.reader.message.bytes Counter Total bytes consumed.
kafka.reader.message.count Counter Total number of messages consumed.
kafka.reader.rebalance.count Counter Total number of rebalances of a topic in a consumer group (deprecated) .
kafka.reader.timeouts.count Counter Total number of timeouts occurred when reading from Kafka

Writer

As for the writer, the metrics are as follows:

Metrics Type Description
kafka.writer.dial.count Counter Total number of times the writer tries to connect to Kafka.
kafka.writer.error.count Counter Total number of errors occurred when writing to Kafka.
kafka.writer.message.bytes Counter Total bytes produced.
kafka.writer.message.count Counter Total number of messages produced.
kafka.writer.rebalance.count Counter Total number of rebalances of a topic (deprecated).
kafka.writer.write.count Counter Total number of times the writer writes batches of messages to Kafka.

There are more available kafka metrics, as you can find them here. However, the extension does not collect all metrics yet. You can follow this GitHub issue to track the progress of their additions.

More examples

Moreover, the xk6-kafka repository provides a few test scripts that work out-of-the-box for new users. At the time of this writing, it comes with the following tests:

Feel free to experiment with them and modify the code accordingly based on your own use cases. If you bump into issues, report them on GitHub.

Conclusion

In conclusion, load testing Apache Kafka is now a lot easier with k6. k6 provides the foundation to create and scale your load tests, and the xk6-kafka extension brings a convenient API to interact with a Kafka server.

If you wish to find out more on other available k6 extensions, simply head over to the bundle builder page. The page also allows you to generate the corresponding command for building your own custom k6 binary.

If you have any questions or are interested in building an extension, join the k6 community on Slack.

Top comments (0)