This is a short write up on the exercise of inserting batches of Kafka Records into CockroachDB using Confluent's JDBC Sink Connector, a 'no-code' solution for data ingestion.
An example on how to setup the pipeline locally using Docker is documented in this blog post.
The GitHub repository referenced in this write up is
The pipeline is very simple:
- A python script generates data that gets ingested into a Kafka Topic within the Kafka Broker.
- The Topic is partitioned.
- Exactly 1 Kafka Connect task is started for each partition.
- The Task reads from the topic partition and inserts into CockroachDB by making a conneciton through the Load Balancer.
Infrastructure was deployed using Ansible on Google Cloud VMs:
- Single node Confluent Platform (Kafka broker and Kafka Connect) on
- 3 nodes CockroachDB cluster using the
n2d-standard-8|16|32instance types. Each VM was provisioned with 1 x 2.5TB Persistent SSD (
- Single node Load Balancer instance running HAProxy.
The main Kafka backend was installed using the Ansible Playbooks for Confluent Platform.
The CockroachDB cluster and the HAProxy load balancer instance were installed using the
fabiog1901.cockroachdb Ansible Collection.
The test was run executing convenience Python script
The script coordinates the execution of 4 Ansible Playbooks:
kafka.yaml- Provision and prepare the Kafka cluster.
cockroachdb.yaml- Provision and prepare the CockroachDB cluster.
kafka-producer.yaml- Prepare Kafka broker and start the Kafka producer.
kafka-consumer.yaml- Run the Kafka consumer i.e. Kafka Connect.
To load data into the Kafka Topic we used a simple generator written in Python,
The generator leverages the
confluent-kafka package for publishing Avro records of about 60 fields.
The generator is started and let run for 20 minutes before any consumer process is started, so that the Topic is always well filled with records.
Kafka Connect was configured with the JDBC Sink Connector, however, a custom
kafka-connect-jdbc-10.6.1.jar file was used: the only change made to the original version was to set
autocommit=true for the SQL transactions, here.
This change is important as it allows statements to be executed implicitly, saving therefore a roundtrip for the commit message.
The Jar file can be found in the
Similarly, a custom PostgreSQL JDBC Driver was used, allowing for batch statements to be larger than 128 records, see here.
The result is we can now test with multi-value INSERT statements that have more than 128 values.
The custom driver Jar file is also in the
The 3 nodes CockroachDB cluster runs version 22.2.5|6.
The database was seeded with approximately 0.5TB of data.
The data was generated externally and imported from Google Cloud Storage directly into the database.
CockroachDB stored the data with a Replication Factor of 3, the default.
This implies that every single node has a full copy of the entire dataset.
See custom settings and DDL statements executed in file
We tested with 3 instance types, multiple Kafka topic partitions and batch sizes.
play.py was used to run the tests.
In short, for each instance type, we cycled through all partitions, and for each partition, we cycled through all batch sizes.
On each partition cycle, the JDBC Sink Connector was created with
tasks.max set to the same number as the partition count.
Here, a task is a process that creates a database connection, consumes records from the assigned topic partition, prepares the INSERT statement and finally sends it to CockroachDB for execution.
On each batch size cycle, the JDBC Sink Connector was created with
consumer.override.max.poll.records set to the current
Results of transaction latency, throughput (TPS) and CPU util are shown below for each of the test cases.
per_stmt_latency_ms is a computed value, derived by dividing
- It is generally recommended to keep the cluster CPU Utilization at around 50% as to have headroom for sudden spikes, node failures, and background database operations such as backups, CDC feeds, import/export jobs, etc.
- Write throughput varies greatly depending on the hardware utilized. See public clouds hardware recommendation for CockroachDB in the Cloud Report.
- Transaction latency varies in multi-region clusters, as you can expect transactions have to ensure at least 1 out of region replica has to be kept in sync.
- Other factors impacting latency include, but are not limited to: read/write ratio, count of secondary indexes, database topology, client location, record size.
In this project, I have tweaked both the driver and the
kafka-connect-jdbc connector. For my next tests, I like to:
- Explore best ways to optimize the Kafka Connector, possibly working along with the Confluent engineering team.
- Replace the standard JDBC PostgreSQL Driver with the
cockroachdb-jdbcdriver, kindly developed and maintained by Kai Niemi.