Using ksqlDB in Confluent Cloud makes things a whole bunch easier because now you just get to build apps and streaming pipelines, instead of having to run and manage a bunch of infrastructure yourself.
Once you’ve got ksqlDB provisioned on Confluent Cloud you can use the web-based editor to build and run queries. You can also connect to it using the REST API and the ksqlDB CLI tool. Here’s how.
Creating the ksqlDB API key 🔗
You need to generate an API keypair for your ksqlDB instance (known as an application). This is a different API keypair from that which you will have for your Kafka brokers on Confluent Cloud.
Using the Confluent CLI, first authenticate:
$ confluent login
…
Logged in as "[email protected]".
Using environment "t4242" ("default").
Then list out the ksqlDB application(s) present:
$ confluent ksql cluster list
Id | Name | Topic Prefix | Kafka | Storage | Endpoint | Status
+--------------+----------+--------------+-----------+---------+------------------------------------------------------------+--------+
lksqlc-1234 | trains | pksqlc-***** | lkc-***** | 500 | https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443 | UP
lksqlc-***** | ais | pksqlc-***** | lkc-***** | 500 | https://pksqlc-****.us-west2.gcp.confluent.cloud:443 | UP
lksqlc-***** | carparks | pksqlc-***** | lkc-***** | 500 | https://pksqlc-****.us-east1.gcp.confluent.cloud:443 | UP
Make a note of the Endpoint, as well as the Id
of the ksqlDB application to which you want to connect. Specify the Id
as the value for --resource
in this command which will create the keypair:
$ confluent api-key create --resource lksqlc-1234
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | FGVYFW3ER4W4AONO |
| Secret | ude+PKSIHkrl3/nn32ikkesiaIMlfPw37qGaEx1Jy9zXMVRqTUYmKaIKU5gD5pw0 |
+---------+------------------------------------------------------------------+
Protect this key as it allows access to your data & processing! Don’t do anything daft like, say, publish it in a blog on the internet 😉 (…without revoking it first) |
You may choose to store the relevant connection details in a local .env
file - this is up to you, but I’m doing it here because it makes things more reusable.
# This is a .env file
CCLOUD_KSQL_API_KEY=FGVYFW3ER4W4AONO
CCLOUD_KSQL_API_SECRET=ude+PKSIHkrl3/nn32ikkesiaIMlfPw37qGaEx1Jy9zXMVRqTUYmKaIKU5gD5pw0
CCLOUD_KSQL_ENDPOINT=https://pksqlc-1234.europe-north1.gcp.confluent.cloud:443
Once created you can load the environment variables into your local session by running
source .env
Connecting to ksqlDB on Confluent Cloud from local CLI 🔗
You can install ksqlDB locally as part of the Confluent Platform download, or just run it as a Docker container. Here I’m running it as temporary container that’s deleted when it exits. I’m using the ksqlDB endpoint and authentication details saved in a .env
file as shown above.
source .env
docker run --interactive --tty --rm \
confluentinc/ksqldb-server:0.28.2 \
ksql -u $CCLOUD_KSQL_API_KEY \
-p $CCLOUD_KSQL_API_SECRET \
$CCLOUD_KSQL_ENDPOINT
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2022 Confluent Inc.
CLI v0.28.2, Server v0.28.2-rc8 located at https://pksqlc-m5ny7.us-west4.gcp.confluent.cloud:443
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Connecting to the ksqlDB REST API on Confluent Cloud 🔗
ksqlDB has a rich REST API that you can use for creating and querying objects in ksqlDB. You can use it directly from a tool like curl
, or embedded within your own application.
The first thing to do is 'smoke test' the connection and make sure you have the correct authentication details and endpoint. Here I’m using HTTPie:
$ http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/info
HTTP/1.1 200 OK
content-length: 130
content-type: application/json
{
"KsqlServerInfo": {
"kafkaClusterId": "lkc-v7vwnz",
"ksqlServiceId": "pksqlc-m5ny7",
"serverStatus": "RUNNING",
"version": "0.28.2-rc8"
}
}
The same thing works with curl
(just not as natively pretty-printed 😃):
$ curl -u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/info
{"KsqlServerInfo":{"version":"0.28.2-rc8","kafkaClusterId":"lkc-v7vwnz","ksqlServiceId":"pksqlc-m5ny7","serverStatus":"RUNNING"}}%
The /ksql
endpoint is used to run statements, such as listing topics:
echo '{"ksql": "LIST STREAMS;", "streamsProperties": {}}' | \
http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/ksql
HTTP/1.1 200 OK
content-length: 976
content-type: application/json
[
{
"@type": "kafka_topics",
"statementText": "SHOW TOPICS;",
"topics": [
{
"name": "_kafka-connect-group-gcp-v11-configs",
"replicaInfo": [
3
]
},
{
"name": "_kafka-connect-group-gcp-v11-offsets",
"replicaInfo": [
[…]
You also use the /ksql
endpoint to run statements which create tables and streams. This is how you can programatically deploy ksqlDB applications and pipelines.
This looks a bit grim because of all the quoting, but the concept is still simple.
echo '{"ksql":"CREATE STREAM LOCATIONS_RAW WITH (KAFKA_TOPIC='"'"'ukrail-locations'"'"', FORMAT='"'"'AVRO'"'"');", "streamsProperties": {}}' | \
http -a $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/ksql
[
{
"@type": "currentStatus",
"statementText": "CREATE STREAM LOCATIONS_RAW (ROWKEY STRING KEY, LOCATION_ID STRING, NAME STRING, DESCRIPTION STRING, TIPLOC STRING, CRS STRING, NLC STRING, STANOX STRING, NOTES STRING, LONGITUDE STRING, LATITUDE STRING, ISOFFNETWORK STRING, TIMINGPOINTTYPE STRING) WITH (FORMAT='AVRO', KAFKA_TOPIC='ukrail-locations', KEY_SCHEMA_ID=100092, VALUE_SCHEMA_ID=100093);",
"commandId": "stream/`LOCATIONS_RAW`/create",
"commandStatus": {
"status": "SUCCESS",
"message": "Stream created",
"queryId": null
},
"commandSequenceNumber": 2,
"warnings": []
}
]
To query a stream you use the /query-stream
endpoint. Note that you have to use HTTP2 for this which (as far as I can tell) HTTPie does not support, so I’m showing curl
here. Also note that the API payload is different - sql
instead of ksql
and properties
instead of streamsProperties
:
curl -u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET $CCLOUD_KSQL_ENDPOINT/query-stream -d '{"sql":"SELECT * FROM LOCATIONS_RAW EMIT CHANGES LIMIT 5;", "properties": { "ksql.streams.auto.offset.reset": "earliest" }}'
{"queryId":"dc3ca802-1577-4d93-93c3-a4e9f3aa2654","columnNames":["ROWKEY","LOCATION_ID","NAME","DESCRIPTION","TIPLOC","CRS","NLC","STANOX","NOTES","LONGITUDE","LATITUDE","ISOFFNETWORK","TIMINGPOINTTYPE"],"columnTypes":["STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING","STRING"]}
["2506","2506","Atos C Interface","Atos C Interface","","","1800","","null","null","null","null","null"]
["2510","2510","Tflb Interface","Tflb Interface","","","2200","","null","null","null","null","null"]
["2514","2514","Hq Input Spare","Hq Input Ttl Inward Spare","","","2600","","null","null","null","null","null"]
["2516","2516","","Capcard 2 (Test Purpose Only)","","","2800","","null","null","null","null","null"]
["2522","2522","","Dunfermline","","","3323","","null","null","null","null","null"]