DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

Running a self-managed Kafka Connect worker for Confluent Cloud

Confluent Cloud is not only a fully -managed Apache Kafka service, but also provides important additional pieces for building applications and pipelines including managed connectors, Schema Registry, and ksqlDB. Managed Connectors are run for you (hence, managed!) within Confluent Cloud - you just specify the technology to which you want to integrate in or out of Kafka and Confluent Cloud does the rest.

gcp01

The rate at which managed connectors are being added is impressive, but there you may find that the connector you want to use with Confluent Cloud isn’t yet available. In that case you need to run your own Kafka Connect worker which then connects to Confluent Cloud.

gcp02

In my case I have a Confluent Cloud cluster running on GCP, so it makes sense to run my worker there too (although I could run it anywhere, closer to the cluster seems sensible). I have an ActiveMQ connector that’s pulling data from a 3rd party service that’s also Cloud-based (hence wanting to get all my processing into the Cloud too).

gcp03

After processing the data in Confluent Cloud (with ksqlDB) I’m going to be streaming the data over to Elasticsearch - in Elastic Cloud. For this I will be able to take advantage of a Confluent Cloud managed connector for Elasticsearch:

gcp04

Let’s take a look at what’s involved in running a self-managed Kafka Connect worker alongside Confluent Cloud.

What are my options for running a Kafka Connect worker?

Ultimately, Kafka Connect workers are just JVM processes. You can deploy on bare metal or containers. A few options present themselves:

In this article I’m going to look at the latter of these — Docker.

Running Kafka Connect from Docker Compose, connecting to Confluent Cloud

The reason I love working with Docker is that running software no longer looks like this:

  1. Download the software

  2. Unpack the software

  3. Run the installer

  4. Install other stuff to meet dependency requirements

  5. Uninstall previous version that’re creating conflicts

  6. Try to find previous config files

  7. Cry

Instead is looks like this:

  1. Define software requirements and configuration in a Docker Compose file

  2. Run Docker Compose

  3. Win

I’m missing some of the points, but it’s the ability to simply declare the config and runtime and then instantiate it that makes Docker such a joy to use.

To run Kafka Connect using Docker you start with the base image. From there you need to do a few things before the container launches the worker:

  1. Specify environment variables

  2. Install necessary connector plugins (as well as any Single Message Transform and Converters if using ones other than those that ship with the image)

  3. Install any other requires files, such as JDBC Drivers.

Once the worker is running you can then:

  1. Create connectors. This can be automated, or run manually. I like to automate it :)

Here’s what a Docker Compose looks like for running Kafka Connect locally, connecting to Confluent Cloud. Note that it is configured to also use the Schema Registry hosted in Confluent Cloud by default for the key and value converters.

---
version: '3'
services:

  kafka-connect-ccloud:
    image: confluentinc/cp-kafka-connect-base:6.0.1
    container_name: kafka-connect-ccloud
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-ccloud'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01-v04
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SR_USER:SR_PASSWORD"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SR_USER:SR_PASSWORD"
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
      # Confluent Cloud config
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
      CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
      CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
        while : ; do
            curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
            echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
            if [$$curl_status -eq 200] ; then
            break
            fi
            sleep 5
        done
        echo -e "\n--\n+> Creating Kafka Connect source connectors"
        curl -i -X PUT -H "Accept:application/json" \
            -H "Content-Type:application/json" \
            http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
            -d '{
                "connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
                "activemq.url" : "tcp://my-activemq-endpoint:61619",
                "activemq.username" : "ACTIVEMQ_USER",
                "activemq.password" : "ACTIVEMQ_PASSWORD",
                "jms.destination.type" : "topic",
                "jms.destination.name" : "TRAIN_MVT_EA_TOC",
                "kafka.topic" : "networkrail_TRAIN_MVT",
                "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable" : "false",
                "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable" : "false",
                "topic.creation.default.partitions" : 1,
                "topic.creation.default.replication.factor" : 3,
                "confluent.license" : "",
                "confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
                "confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";",
                "confluent.topic.security.protocol" : "SASL_SSL",
                "confluent.topic.ssl.endpoint.identification.algorithm": "https",
                "confluent.topic.sasl.mechanism" : "PLAIN",
                "confluent.topic.request.timeout.ms" : "20000",
                "confluent.topic.retry.backoff.ms" : "500"
            }'
        #
        #
        sleep infinity
Enter fullscreen mode Exit fullscreen mode

Note that this does everything needed:

  • Installs the connector (ActiveMQ)

  • Launches the Kafka Connect worker (forked to a background process with &)

  • Waits for the worker to be available

  • Creates the connector

    • Observe that topic.creation.default.partitions and topic.creation.default.replication.factor are set - this means that Confluent Cloud will create the target topics that the connector is to write to automagically. This is possible because of KIP-158 which I wrote about recently.

One other point to note is that the worker uses Kafka itself to store state including configuration and status, and it does so in the topics defined under

  • CONNECT_CONFIG_STORAGE_TOPIC

  • CONNECT_OFFSET_STORAGE_TOPIC

  • CONNECT_STATUS_STORAGE_TOPIC

If you’re [re]creating workers make sure that you don’t have a clash on these topics - use a unique number appended to the end, or as I did here use the epoch as part of the unique name.

Deploying a Docker image to Google Compute Engine (GCE) / Google Cloud Platform (GCP)

NOTE: This it is 💯 a Proof-of-Concept (i.e. not blessed by Confluent in any way as "The Right Way"), and builds on my previous experimentation. If you are doing this in anger then for sure you should figure out how to do it properly, but for my purposes of a quick & dirty solution it worked well. It Works On My Machine [well, Google’s]™.

So taking the above Docker Compose definition, we can then use GCE’s feature to run Containers on Compute Engine to provision this directly on GCE. For AWS see the approach that I wrote about here.

To launch a container on GCE either use the Web UI, or the gcloud commandline. The first part of it is simple enough - we name the VM holding the container, we specify the image to use, and so on:

gcloud compute instances create-with-container \
        rmoff-connect-source-v01 \
        --zone=us-east1-b \
        --tags kafka-connect \
        --metadata=google-logging-enabled=true \
        --container-image confluentinc/cp-kafka-connect-base:6.0.1 \
        --container-restart-policy=never \
        […]
Enter fullscreen mode Exit fullscreen mode

When the image starts up, by default it runs the Kafka Connect worker. However, we can override this by specifying a custom command. We run /bin/bash as the command, and then pass in -c as the argument followed by an argument that holds the actual shell script we want to execute:

        […]
        --container-command=/bin/bash \
        --container-arg=-c \
        --container-arg='set -x
        # Run this stuff when the container launches
        […]
        #
        sleep infinity'
Enter fullscreen mode Exit fullscreen mode

Within that command block we use the command seen in the Docker Compose YAML above. So far, so good.

But (you knew there was a but coming, didn’t you), we also need to specify environment variables, and not just a few - and not just with straightforward values. We’ve got dozens of values, and because we’re specifying SASL config there’s quote marks in there, escape characters, and more. The gcloud CLI has the --container-env argument in which we can pass the environment variables as a comma-separated list of key=value pairs, and the = can be overriden to a custom character - but you still end up with an awful mess like this:

container env

It’s not pretty, and it’s a bit of a bugger to debug. You can pass in a separate file holding environment values but I’m always keen on keeping things self-contained if possible. So instead, since I was overriding the command to run as container launch anyway, I overrode the environment variables at that point instead:

        […]
        --container-command=/bin/bash \
        --container-arg=-c \
        --container-arg='set -x
        #
        # Set the environment variables
        export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
        […]
        #
        […]
        #
        sleep infinity'
Enter fullscreen mode Exit fullscreen mode

Most important is to finish with sleep infinity so that the container does not exit (since the Kafka Connect worker process is forked to the background).

It needs some tricky escaping, both of the curl data (-d) block, as well as the quoted passages within it. Here is the final shell invocation:

gcloud compute instances create-with-container rmoff-connect-source-v01 \
        --zone=us-east1-b \
        --tags kafka-connect \
        --metadata=google-logging-enabled=true \
        --container-image confluentinc/cp-kafka-connect-base:6.0.1 \
        --container-restart-policy=never \
        --container-command=/bin/bash \
        --container-arg=-c \
        --container-arg='set -x
        #
        # Set the environment variables
        export CONNECT_CUB_KAFKA_TIMEOUT=300
        export CONNECT_BOOTSTRAP_SERVERS=MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092
        export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
        export CONNECT_REST_PORT=8083
        export CONNECT_GROUP_ID=kafka-connect-group-gcp-v01
        export CONNECT_CONFIG_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-configs
        export CONNECT_OFFSET_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-offsets
        export CONNECT_STATUS_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-status
        export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
        export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
        export CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
        export CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
        export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
        export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
        export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
        export CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
        export CONNECT_RETRY_BACKOFF_MS=500
        export CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
        export CONNECT_SASL_MECHANISM=PLAIN
        export CONNECT_SECURITY_PROTOCOL=SASL_SSL
        export CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
        export CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
        export CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
        export CONNECT_CONSUMER_RETRY_BACKOFF_MS=500
        export CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
        export CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
        export CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
        export CONNECT_PRODUCER_RETRY_BACKOFF_MS=500
        export CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
        export CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
        export CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
        #
        echo "Installing connector plugins"
        confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
        while : ; do
            curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
            echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
            if [$curl_status -eq 200] ; then
            break
            fi
            sleep 5
        done
        echo -e "\n--\n+> Creating Kafka Connect source connectors"
        curl -s -X PUT -H "Content-Type:application/json" \
        http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
            -d '"'"'{
                "connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
                "activemq.url" : "tcp://my-activemq-endpoint:61619",
                "activemq.username" : "ACTIVEMQ_USER",
                "activemq.password" : "ACTIVEMQ_PASSWORD",
                "jms.destination.type" : "topic",
                "jms.destination.name" : "TRAIN_MVT_EA_TOC",
                "kafka.topic" : "networkrail_TRAIN_MVT_v01",
                "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable" : "false",
                "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable" : "false",
                "topic.creation.default.partitions" : 1,
                "topic.creation.default.replication.factor" : 3,
                "confluent.license" : "",
                "confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
                "confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'CCLOUD_USER'\" password=\"'CCLOUD_PASSWORD'\";",
                "confluent.topic.security.protocol" : "SASL_SSL",
                "confluent.topic.ssl.endpoint.identification.algorithm": "https",
                "confluent.topic.sasl.mechanism" : "PLAIN",
                "confluent.topic.request.timeout.ms" : "20000",
                "confluent.topic.retry.backoff.ms" : "500"
            }'"'"'
        #
        sleep infinity'
Enter fullscreen mode Exit fullscreen mode

Container logs

You can use the rather useful gcloud compute ssh to connect to the VM directly that’s been launched

gcloud compute ssh --zone "us-east1-b" "rmoff-connect-source-v01"
Enter fullscreen mode Exit fullscreen mode

If you run it too soon after launch you’ll get an error

Warning: Permanently added 'compute.8428359303178581516' (ED25519) to the list of known hosts.
rmoff@34.75.11.50: Permission denied (publickey).
ERROR: (gcloud.compute.ssh) [/usr/bin/ssh] exited with return code [255].
Enter fullscreen mode Exit fullscreen mode

Once the VM is running properly you’ll get a shell prompt

  ########################[Welcome]########################
  # You have logged in to the guest OS. #
  # To access your containers use 'docker attach' command #
  ###########################################################

rmoff@rmoff-connect-source-v01 ~ $
Enter fullscreen mode Exit fullscreen mode

From here, you can see the containers running on the VM. To start with you’ll see a couple of internal ones (stackdriver-logging-agent, konlet):

rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4a04df77a0be gcr.io/gce-containers/konlet:v.0.11-latest "/bin/gce-containers…" 35 seconds ago Up 32 seconds pedantic_tu
0d008a624e56 gcr.io/stackdriver-agents/stackdriver-logging-agent:0.2-1.5.33-1-1 "/entrypoint.sh /usr…" 2 days ago Up 2 days stackdriver-logging-agent
Enter fullscreen mode Exit fullscreen mode

and soon after, the actual container that you’ve configured to run:

rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1e349180aa20 confluentinc/cp-kafka-connect-base:6.0.1 "/bin/bash -c 'set -…" 33 seconds ago Up 30 seconds (health: starting) klt-rmoff-connect-source-v01-qjez
Enter fullscreen mode Exit fullscreen mode

At this point you’re just in normal Docker world, and can look at the logs as you would locally:

rmoff@rmoff-connect-source-v01 ~ $ docker logs -f klt-rmoff-connect-source-v01-qjez|more
+ export CONNECT_CUB_KAFKA_TIMEOUT=300
+ CONNECT_CUB_KAFKA_TIMEOUT=300
[…]
Installing connector plugins
+ echo 'Installing connector plugins'
+ confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
Running in a "--no-prompt" mode
[…]
[2021-01-11 21:56:38,614] INFO [Worker clientId=connect-1, groupId=kafka-connect-group-gcp-v01] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[…]
Enter fullscreen mode Exit fullscreen mode

With all of this done, you should now see topics on your Confluent Cloud cluster for both the internal Kafka Connect worker topics, and any populated by the connector:

gcp05

gcp06

When you want to shut down the VM you can use delete:

gcloud compute instances delete --zone "us-east1-b" "rmoff-connect-source-v01"
Enter fullscreen mode Exit fullscreen mode

Top comments (0)