DEV Community

loading...
Confluent

Connecting to managed ksqlDB in Confluent Cloud with REST and ksqlDB CLI

Robin Moffatt
Robin Moffatt is a Developer Advocate at Confluent, and regular conference speaker. He also likes writing about himself in the third person, eating good breakfasts, and drinking good beer.
Originally published at rmoff.net on ・5 min read

Using ksqlDB in Confluent Cloud makes things a whole bunch easier because now you just get to build apps and streaming pipelines, instead of having to run and manage a bunch of infrastructure yourself.

Once you’ve got ksqlDB provisioned on Confluent Cloud you can use the web-based editor to build and run queries. You can also connect to it using the REST API and the ksqlDB CLI tool. Here’s how.

Creating the ksqlDB API key

You need to generate an API keypair for your ksqlDB instance (known as an application). This is a different API keypair from that which you will have for your Kafka brokers on Confluent Cloud.

Using the Confluent Cloud CLI, first authenticate:

$ ccloud login
…
Logged in as "rick@nevergonnagiveyouup.com".
Using environment "t4242" ("default").
Enter fullscreen mode Exit fullscreen mode

Then list out the ksqlDB application(s) present:

$ ccloud ksql app list

       Id      |   Name   | Topic Prefix |   Kafka   | Storage |                          Endpoint                          | Status
+--------------+----------+--------------+-----------+---------+------------------------------------------------------------+--------+
  lksqlc-1234  | trains   | pksqlc-***** | lkc-***** |     500 | https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443 | UP
  lksqlc-***** | ais      | pksqlc-***** | lkc-***** |     500 | https://pksqlc-****.us-west2.gcp.confluent.cloud:443      | UP
  lksqlc-***** | carparks | pksqlc-***** | lkc-***** |     500 | https://pksqlc-****.us-east1.gcp.confluent.cloud:443      | UP
Enter fullscreen mode Exit fullscreen mode

Make a note of the Endpoint, as well as the Id of the ksqlDB application to which you want to connect. Specify the Id as the value for --resource in this command which will create the keypair:

$ ccloud api-key create --resource lksqlc-1234
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | FGVYFW3ER4W4AONO                                                 |
| Secret  | ude+PKSIHkrl3/nn32ikkesiaIMlfPw37qGaEx1Jy9zXMVRqTUYmKaIKU5gD5pw0 |
+---------+------------------------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

NOTE: Protect this key as it allows access to your data & processing! Don’t do anything daft like, say, publish it in a blog on the internet 😉 (…without revoking it first)

You may choose to store the relevant connection details in a local .env file - this is up to you, but I’m doing it here because it makes things more reusable.

# This is a .env file
CCLOUD_KSQL_API_KEY=FGVYFW3ER4W4AONO
CCLOUD_KSQL_API_SECRET=ude+PKSIHkrl3/nn32ikkesiaIMlfPw37qGaEx1Jy9zXMVRqTUYmKaIKU5gD5pw0
CCLOUD_KSQL_ENDPOINT=https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443
Enter fullscreen mode Exit fullscreen mode

Once created you can load the environment variables into your local session by running

source .env
Enter fullscreen mode Exit fullscreen mode

Connecting to ksqlDB on Confluent Cloud from local CLI

You can install ksqlDB locally as part of the Confluent Platform download, or just run it as a Docker container. Here I’m running it as temporary container that’s deleted when it exits. I’m using the ksqlDB endpoint and authentication details saved in a .env file as shown above.

source .env

docker run --interactive --tty --rm \
  confluentinc/ksqldb-server:0.15.0 \
  ksql -u $CCLOUD_KSQL_API_KEY \
       -p $CCLOUD_KSQL_API_SECRET \
          $CCLOUD_KSQL_ENDPOINT

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.15.0, Server v0.15.0-rc863 located at https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>
Enter fullscreen mode Exit fullscreen mode

Connecting to the ksqlDB REST API on Confluent Cloud

ksqlDB has a rich REST API that you can use for creating and querying objects in ksqlDB. You can use it directly from a tool like curl, or embedded within your own application.

The first thing to do is 'smoke test' the connection and make sure you have the correct authentication details and endpoint. Here I’m using HTTPie:

$ http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/info
HTTP/1.1 200 OK
content-length: 130
content-type: application/json

{
    "KsqlServerInfo": {
        "kafkaClusterId": "lkc-*****",
        "ksqlServiceId": "pksqlc-1234",
        "serverStatus": "RUNNING",
        "version": "0.15.0-rc863"
    }
}
Enter fullscreen mode Exit fullscreen mode

The same thing works with curl (just not as natively pretty-printed 😃):

$ curl -u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/info
{"KsqlServerInfo":{"version":"0.15.0-rc863","kafkaClusterId":"lkc- *****","ksqlServiceId":"pksqlc-1234","serverStatus":"RUNNING"}}
Enter fullscreen mode Exit fullscreen mode

The /ksql endpoint is used to run statements, such as listing topics:

echo '{"ksql": "LIST STREAMS;", "streamsProperties": {}}' | \
  http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/ksql

HTTP/1.1 200 OK
content-length: 976
content-type: application/json

[
    {
        "@type": "kafka_topics",
        "statementText": "SHOW TOPICS;",
        "topics": [
            {
                "name": "_kafka-connect-group-gcp-v11-configs",
                "replicaInfo": [
                    3
                ]
            },
            {
                "name": "_kafka-connect-group-gcp-v11-offsets",
                "replicaInfo": [
[…]
Enter fullscreen mode Exit fullscreen mode

You also use the /ksql endpoint to run statements which create tables and streams. This is how you can programatically deploy ksqlDB applications and pipelines.

This looks a bit grim because of all the quoting, but the concept is still simple.

echo '{"ksql":"CREATE STREAM LOCATIONS_RAW WITH (KAFKA_TOPIC='"'"'ukrail-locations'"'"', FORMAT='"'"'AVRO'"'"');", "streamsProperties": {}}' | \
  http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/ksql

[
  {
    "@type": "currentStatus",
    "statementText": "CREATE STREAM LOCATIONS_RAW (ROWKEY STRING KEY, LOCATION_ID STRING, NAME STRING, DESCRIPTION STRING, TIPLOC STRING, CRS STRING, NLC STRING, STANOX STRING, NOTES STRING, LONGITUDE STRING, LATITUDE STRING, ISOFFNETWORK STRING, TIMINGPOINTTYPE STRING) WITH (FORMAT='AVRO', KAFKA_TOPIC='ukrail-locations', KEY_SCHEMA_ID=100092, VALUE_SCHEMA_ID=100093);",
    "commandId": "stream/`LOCATIONS_RAW`/create",
    "commandStatus": {
      "status": "SUCCESS",
      "message": "Stream created",
      "queryId": null
    },
    "commandSequenceNumber": 2,
    "warnings": []
  }
]
Enter fullscreen mode Exit fullscreen mode

To query a stream you use the /query-stream endpoint. Note that you have to use HTTP2 for this which (as far as I can tell) HTTPie does not support, so I’m showing curl here. Also note that the API payload is different - sql instead of ksql and properties instead of streamsProperties:

curl -u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/query-stream -d '{"sql":"SELECT * FROM LOCATIONS_RAW EMIT CHANGES LIMIT 5;", "properties": { "ksql.streams.auto.offset.reset": "earliest" }}'

{"queryId":"dc3ca802-1577-4d93-93c3-a4e9f3aa2654","columnNames":["ROWKEY","LOCATION_ID","NAME","DESCRIPTION","TIPLOC","CRS","NLC","STANOX","NOTES","LONGITUDE","LATITUDE","ISOFFNETWORK","TIMINGPOINTTYPE"],"columnTypes":["STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING"]}
["2506","2506","Atos C Interface","Atos C Interface","","","1800","","null","null","null","null","null"]
["2510","2510","Tflb Interface","Tflb Interface","","","2200","","null","null","null","null","null"]
["2514","2514","Hq Input Spare","Hq Input Ttl Inward Spare","","","2600","","null","null","null","null","null"]
["2516","2516","","Capcard 2 (Test Purpose Only)","","","2800","","null","null","null","null","null"]
["2522","2522","","Dunfermline","","","3323","","null","null","null","null","null"]
Enter fullscreen mode Exit fullscreen mode

Discussion (0)