Using ksqlDB in Confluent Cloud makes things a whole bunch easier because now you just get to build apps and streaming pipelines, instead of having to run and manage a bunch of infrastructure yourself.
Once you’ve got ksqlDB provisioned on Confluent Cloud you can use the web-based editor to build and run queries. You can also connect to it using the REST API and the ksqlDB CLI tool. Here’s how.
Creating the ksqlDB API key
You need to generate an API keypair for your ksqlDB instance (known as an application). This is a different API keypair from that which you will have for your Kafka brokers on Confluent Cloud.
Using the Confluent Cloud CLI, first authenticate:
$ ccloud login
…
Logged in as "rick@nevergonnagiveyouup.com".
Using environment "t4242" ("default").
Then list out the ksqlDB application(s) present:
$ ccloud ksql app list
Id | Name | Topic Prefix | Kafka | Storage | Endpoint | Status
+--------------+----------+--------------+-----------+---------+------------------------------------------------------------+--------+
lksqlc-1234 | trains | pksqlc-***** | lkc-***** | 500 | https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443 | UP
lksqlc-***** | ais | pksqlc-***** | lkc-***** | 500 | https://pksqlc-****.us-west2.gcp.confluent.cloud:443 | UP
lksqlc-***** | carparks | pksqlc-***** | lkc-***** | 500 | https://pksqlc-****.us-east1.gcp.confluent.cloud:443 | UP
Make a note of the Endpoint, as well as the Id
of the ksqlDB application to which you want to connect. Specify the Id
as the value for --resource
in this command which will create the keypair:
$ ccloud api-key create --resource lksqlc-1234
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | FGVYFW3ER4W4AONO |
| Secret | ude+PKSIHkrl3/nn32ikkesiaIMlfPw37qGaEx1Jy9zXMVRqTUYmKaIKU5gD5pw0 |
+---------+------------------------------------------------------------------+
NOTE: Protect this key as it allows access to your data & processing! Don’t do anything daft like, say, publish it in a blog on the internet 😉 (…without revoking it first)
You may choose to store the relevant connection details in a local .env
file - this is up to you, but I’m doing it here because it makes things more reusable.
# This is a .env file
CCLOUD_KSQL_API_KEY=FGVYFW3ER4W4AONO
CCLOUD_KSQL_API_SECRET=ude+PKSIHkrl3/nn32ikkesiaIMlfPw37qGaEx1Jy9zXMVRqTUYmKaIKU5gD5pw0
CCLOUD_KSQL_ENDPOINT=https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443
Once created you can load the environment variables into your local session by running
source .env
Connecting to ksqlDB on Confluent Cloud from local CLI
You can install ksqlDB locally as part of the Confluent Platform download, or just run it as a Docker container. Here I’m running it as temporary container that’s deleted when it exits. I’m using the ksqlDB endpoint and authentication details saved in a .env
file as shown above.
source .env
docker run --interactive --tty --rm \
confluentinc/ksqldb-server:0.15.0 \
ksql -u $CCLOUD_KSQL_API_KEY \
-p $CCLOUD_KSQL_API_SECRET \
$CCLOUD_KSQL_ENDPOINT
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.15.0, Server v0.15.0-rc863 located at https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Connecting to the ksqlDB REST API on Confluent Cloud
ksqlDB has a rich REST API that you can use for creating and querying objects in ksqlDB. You can use it directly from a tool like curl
, or embedded within your own application.
The first thing to do is 'smoke test' the connection and make sure you have the correct authentication details and endpoint. Here I’m using HTTPie:
$ http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/info
HTTP/1.1 200 OK
content-length: 130
content-type: application/json
{
"KsqlServerInfo": {
"kafkaClusterId": "lkc-*****",
"ksqlServiceId": "pksqlc-1234",
"serverStatus": "RUNNING",
"version": "0.15.0-rc863"
}
}
The same thing works with curl
(just not as natively pretty-printed 😃):
$ curl -u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/info
{"KsqlServerInfo":{"version":"0.15.0-rc863","kafkaClusterId":"lkc- *****","ksqlServiceId":"pksqlc-1234","serverStatus":"RUNNING"}}
The /ksql
endpoint is used to run statements, such as listing topics:
echo '{"ksql": "LIST STREAMS;", "streamsProperties": {}}' | \
http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/ksql
HTTP/1.1 200 OK
content-length: 976
content-type: application/json
[
{
"@type": "kafka_topics",
"statementText": "SHOW TOPICS;",
"topics": [
{
"name": "_kafka-connect-group-gcp-v11-configs",
"replicaInfo": [
3
]
},
{
"name": "_kafka-connect-group-gcp-v11-offsets",
"replicaInfo": [
[…]
You also use the /ksql
endpoint to run statements which create tables and streams. This is how you can programatically deploy ksqlDB applications and pipelines.
This looks a bit grim because of all the quoting, but the concept is still simple.
echo '{"ksql":"CREATE STREAM LOCATIONS_RAW WITH (KAFKA_TOPIC='"'"'ukrail-locations'"'"', FORMAT='"'"'AVRO'"'"');", "streamsProperties": {}}' | \
http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/ksql
[
{
"@type": "currentStatus",
"statementText": "CREATE STREAM LOCATIONS_RAW (ROWKEY STRING KEY, LOCATION_ID STRING, NAME STRING, DESCRIPTION STRING, TIPLOC STRING, CRS STRING, NLC STRING, STANOX STRING, NOTES STRING, LONGITUDE STRING, LATITUDE STRING, ISOFFNETWORK STRING, TIMINGPOINTTYPE STRING) WITH (FORMAT='AVRO', KAFKA_TOPIC='ukrail-locations', KEY_SCHEMA_ID=100092, VALUE_SCHEMA_ID=100093);",
"commandId": "stream/`LOCATIONS_RAW`/create",
"commandStatus": {
"status": "SUCCESS",
"message": "Stream created",
"queryId": null
},
"commandSequenceNumber": 2,
"warnings": []
}
]
To query a stream you use the /query-stream
endpoint. Note that you have to use HTTP2 for this which (as far as I can tell) HTTPie does not support, so I’m showing curl
here. Also note that the API payload is different - sql
instead of ksql
and properties
instead of streamsProperties
:
curl -u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/query-stream -d '{"sql":"SELECT * FROM LOCATIONS_RAW EMIT CHANGES LIMIT 5;", "properties": { "ksql.streams.auto.offset.reset": "earliest" }}'
{"queryId":"dc3ca802-1577-4d93-93c3-a4e9f3aa2654","columnNames":["ROWKEY","LOCATION_ID","NAME","DESCRIPTION","TIPLOC","CRS","NLC","STANOX","NOTES","LONGITUDE","LATITUDE","ISOFFNETWORK","TIMINGPOINTTYPE"],"columnTypes":["STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING"]}
["2506","2506","Atos C Interface","Atos C Interface","","","1800","","null","null","null","null","null"]
["2510","2510","Tflb Interface","Tflb Interface","","","2200","","null","null","null","null","null"]
["2514","2514","Hq Input Spare","Hq Input Ttl Inward Spare","","","2600","","null","null","null","null","null"]
["2516","2516","","Capcard 2 (Test Purpose Only)","","","2800","","null","null","null","null","null"]
["2522","2522","","Dunfermline","","","3323","","null","null","null","null","null"]
Top comments (0)