DEV Community

chriscasano
chriscasano

Posted on • Updated on

Changed Data Capture from CockroachDB to ConfluentCloud

Changed Data Capture from CockroachDB to ConfluentCloud

Here's a simple tutorial for sending data from CockroachDB directly to Confluent Cloud using CockroachDB Change Data Capture, which is typically refered to as a Changefeed. This example can be applied to CockroachCloud or a self hosted deployment of CockroachDB. This tutorial was tested on CockroachDB 20.2 and ConfluentCloud 1.25.

Setup Confluent Cloud

Setup Your Kafka Cluster

Get Kafka Resource ID

The ID list here for your Kafka cluster will be needed in the steps below

ccloud kafka cluster list
Enter fullscreen mode Exit fullscreen mode

Create API Keys

The API Key and API Secret are needed for creating the CockroachDB Changefeed

ccloud api-key create --resource <RESOURCE ID>
Enter fullscreen mode Exit fullscreen mode

Get Kafka End Point

The end point is needed to connect the Changefeed to Kafka

ccloud kafka cluster describe <RESOURCE ID>
Enter fullscreen mode Exit fullscreen mode

Create Topic

ccloud kafka topic create demo_t --partitions 6
Enter fullscreen mode Exit fullscreen mode

Start a Kafka Consumer to Verify Your Change Data Feed

ccloud kafka topic consume demo_t
Enter fullscreen mode Exit fullscreen mode

Setup CockroachDB or CockroachCloud

Do note that Changefeeds do not currently work on CockroachCloud Free-Tier. Use a Dedicated cluster to try this instead.

Create CockroachDB Table

Open a new terminal window and leave the Kafka consumer one open for later. Log in the cockroach sql command line and enter the following commands.

First, ensure rangefeeds are enabled.

SET CLUSTER SETTING kv.rangefeed.enabled = true;
Enter fullscreen mode Exit fullscreen mode

Next, create a table.

create table t (k int default unique_rowid() primary key, v string);
Enter fullscreen mode Exit fullscreen mode

Create Changefeed

When creating the changefeed, notice that you'll use kafka:// instead of using the returned endpoints earlier in ConfluentCloud (ie. https:// or SASL_SSL://). Also, be sure to include your API Key and Secret in the Changefeed.

CREATE CHANGEFEED FOR TABLE t INTO 'kafka://<CONFLUENT CLOUD URL>:9092?sasl_enabled=true&sasl_password=<API SECRET>&sasl_user=<API KEY>&tls_enabled=true&topic_prefix=demo_' WITH updated, key_in_value, format = json;
Enter fullscreen mode Exit fullscreen mode

Insert Some Rows

insert into t (v) values ('one');
insert into t (v) values ('two');
insert into t (v) values ('three');
Enter fullscreen mode Exit fullscreen mode

Verify data is showing up in your consumer app

Alt Text

Top comments (0)