Event streaming is a technique for processing and transferring data in real time, which entails the transmission of data streams across different network protocols. Its purpose is to facilitate swift data exchange between various components within a system or application.
Apache Kafka serves as a prominent platform for managing real-time events. In a previous article, we looked at the architecture of Apache Kafka. Now let’s examine how to run performance tests on it.
Running Apache Kafka locally
To save time on creating docker-compose
files, we will use a ready-made image. Inside the repository we can read that inside the image there is:
- A Kafka distribution with Apache Kafka, Kafka Connect, Zookeeper, Confluent Schema Registry and REST Proxy
- Lenses.io Lenses or kafka-topics-ui, schema-registry-ui, kafka-connect-ui
- Lenses.io Stream Reactor, 25+ Kafka Connectors to simplify ETL processes
- Integration testing and examples embedded into the docker
So let's launch the instance with the following command.
docker run --detach --rm --name lensesio -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 -e RUN_TESTS=0 lensesio/fast-data-dev:latest
After entering the address 127.0.0.1:3000
I get a dashboard from which we can read, among others, created topics or their partitions.
Apache Kafka Performance Tests.
Apache Kafka, like any other service, platform or server, has its performance limitations. In order to know them, you need to run performance tests on Apache Kafka's Cluster, which will show acceptation thresholds. One project that can help with this is xk6-kafka.
Xk6-kafka is an extension to the k6 tool that allows performance testing of Apache Kafka via a producer and (possibly) a consumer. We will use the following script to do this.
import { check } from "k6";
import {
Writer,
Reader,
Connection,
SchemaRegistry,
SCHEMA_TYPE_STRING,
} from "k6/x/kafka";
const brokers = ["localhost:9092"];
const topic = "xk6_kafka_json_topic";
const writer = new Writer({
brokers: brokers,
topic: topic,
autoCreateTopic: true,
});
const reader = new Reader({
brokers: brokers,
topic: topic,
});
const connection = new Connection({
address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
connection.createTopic({ topic: topic });
}
export const options = {
thresholds: {
kafka_writer_error_count: ["count == 0"],
kafka_reader_error_count: ["count == 0"],
},
};
export default function () {
let messages = [
{
key: schemaRegistry.serialize({
data: "test-key-string",
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({
data: "test-value-string",
schemaType: SCHEMA_TYPE_STRING,
}),
headers: {
mykey: "myvalue",
},
partition: 0,
time: new Date(), // Will be converted to timestamp automatically
},
{
key: schemaRegistry.serialize({
data: "test-key-string",
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({
data: "test-value-string",
schemaType: SCHEMA_TYPE_STRING,
}),
headers: {
mykey: "myvalue",
},
},
];
writer.produce({ messages: messages });
// Read 2 messages only
messages = reader.consume({ limit: 2 });
check(messages, {
"2 messages are received": (messages) => messages.length == 2,
});
check(messages[0], {
"Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic,
"Key is a string and is correct": (msg) =>
schemaRegistry.deserialize({
data: msg.key,
schemaType: SCHEMA_TYPE_STRING,
}) == "test-key-string",
"Value is a string and is correct": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_STRING,
}) == "string" &&
schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_STRING,
}) == "test-value-string",
"Header equals {'mykey': 'myvalue'}": (msg) =>
"mykey" in msg.headers &&
String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
"Partition is zero": (msg) => msg["partition"] == 0,
"High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
});
}
export function teardown(data) {
if (__VU == 0) {
connection.deleteTopic(topic);
}
writer.close();
reader.close();
connection.close();
}
Inside the script, we create a topic (if it does not exist) and send two messages to it. Then we perform a standard check
and close the connections and delete the previously created topic. It is important that the number of messages we want to read is not greater than the one we sent - otherwise the tool will return an error.
After running the tool in the console, we get the following summary.
tmp> .\k6.exe run --vus 1 --duration 100s .\scenario.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: .\scenario.js
output: -
scenarios: (100.00%) 1 scenario, 1 max VUs, 2m10s max duration (incl. graceful stop):
* default: 1 looping VUs for 1m40s (gracefulStop: 30s)
running (1m40.0s), 0/1 VUs, 32642 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs 1m40s
✓ 2 messages are received
✓ Topic equals to xk6_kafka_json_topic
✓ Key is a string and is correct
✓ Value is a string and is correct
✓ Header equals {'mykey': 'myvalue'}
✓ Time is past
✓ Partition is zero
✓ High watermark is gte zero
█ teardown
checks.........................: 100.00% ✓ 261136 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=3.05ms min=1.48ms med=2.74ms max=40.27ms p(90)=3.83ms p(95)=3.98ms
iterations.....................: 32642 326.345726/s
kafka_reader_dial_count........: 1 0.009998/s
kafka_reader_dial_seconds......: avg=215ns min=0s med=0s max=7.04ms p(90)=0s p(95)=0s
✓ kafka_reader_error_count.......: 0 0/s
kafka_reader_fetch_bytes.......: 2.1 MB 21 kB/s
kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000
kafka_reader_fetch_bytes_min...: 1 min=1 max=1
kafka_reader_fetch_size........: 32662 326.54568/s
kafka_reader_fetch_wait_max....: 200ms min=200ms max=200ms
kafka_reader_fetches_count.....: 65265 652.501495/s
kafka_reader_lag...............: 0 min=0 max=0
kafka_reader_message_bytes.....: 2.1 MB 21 kB/s
kafka_reader_message_count.....: 65284 652.691452/s
kafka_reader_offset............: 65284 min=2 max=65284
kafka_reader_queue_capacity....: 1 min=1 max=1
kafka_reader_queue_length......: 0 min=0 max=0
kafka_reader_read_seconds......: avg=16.06µs min=0s med=0s max=5.77ms p(90)=0s p(95)=260.99µs
kafka_reader_rebalance_count...: 0 0/s
kafka_reader_timeouts_count....: 0 0/s
kafka_reader_wait_seconds......: avg=1.51ms min=499.9µs med=1.37ms max=19.7ms p(90)=1.91ms p(95)=1.98ms
kafka_writer_acks_required.....: 0 min=0 max=0
kafka_writer_async.............: 0.00% ✓ 0 ✗ 32642
kafka_writer_attempts_max......: 0 min=0 max=0
kafka_writer_batch_bytes.......: 1.8 MB 18 kB/s
kafka_writer_batch_max.........: 1 min=1 max=1
kafka_writer_batch_size........: 32642 326.345726/s
kafka_writer_batch_timeout.....: 0s min=0s max=0s
✓ kafka_writer_error_count.......: 0 0/s
kafka_writer_message_bytes.....: 3.5 MB 35 kB/s
kafka_writer_message_count.....: 65284 652.691452/s
kafka_writer_read_timeout......: 0s min=0s max=0s
kafka_writer_retries_count.....: 0 0/s
> kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count.......: 65284 652.691452/s
kafka_writer_write_seconds.....: avg=71.73µs min=0s med=0s max=1.86ms p(90)=270.34µs p(95)=276.2µs
kafka_writer_write_timeout.....: 0s min=0s max=0s
vus............................: 1 min=1 max=1
vus_max........................: 1 min=1 max=1
As you can see, in 100 seconds, as many as 32,000 messages were sent on a single thread. Pretty fast, right?
Analysis of results
A dedicated solution in the form of the k6 extension allows you to extract a mass of metrics, which can then be used for further analysis. They include, among others:
- kafka_reader_error_count
- kafka_reader_wait_seconds
- kafka_reader_message_count
As you know, console-level test results can be hard to analyze. Therefore, we created a Grafana dashboard to visualize them. You can find it on our repository github. What does it look like from the user level?
Top comments (0)