Connecting Apache Flink SQL to Confluent Cloud Kafka broker

Published by in Apache Flink, Confluent Cloud, Kcat at https://preview.rmoff.net/2025/07/22/connecting-apache-flink-sql-to-confluent-cloud-kafka-broker/

This is a quick blog post to remind me how to connect Apache Flink to a Kafka topic on Confluent Cloud. You may wonder why you’d want to do this, given that Confluent Cloud for Apache Flink is a much easier way to run Flink SQL. But, for whatever reason, you’re here and you want to understand the necessary incantations to get this connectivity to work.

There are two versions of this connectivity - with, and without, using the Schema Registry for Avro.

MVP: Just connect to a Kafka topic; no Avro. 🔗

af cc.excalidraw

Get the broker endpoint and API key 🔗

First off, you need to get two things:

  1. The address of your Confluent Cloud broker

  2. An API key pair with authorisation to access the topic that you want to read/write

I’m going to use the Confluent CLI to do this, but you can also use the web interface if you’d prefer (head to the Cluster Overview page).

Get a list of your Kafka clusters on Confluent Cloud:

$ confluent kafka cluster list
  Current |     ID     |   Name    |   Type    | Cloud |  Region   | Availability […]
----------+------------+-----------+-----------+-------+-----------+--------------[…]
  *       | lkc-qnygo6 | cluster00 | STANDARD  | aws   | us-west-2 | single-zone  […]
          | lkc-v2p3j0 | dev       | DEDICATED | aws   | us-west-2 | single-zone  […]

Then describe the cluster on which your topic is, to get the Endpoint:

$ confluent kafka cluster describe lkc-qnygo6
+----------------------+---------------------------------------------------------+
| Current              | true                                                    |
| ID                   | lkc-qnygo6                                              |
| Name                 | cluster00                                               |
[…]
| Endpoint             | SASL_SSL://pkc-rgm37.us-west-2.aws.confluent.cloud:9092 |

Now we’ll create an API key for accessing this broker. Specify the ID of the cluster (e.g. lkc-qnygo6) as the resource to which the API key is to have access. If you have one already then you can skip this step.

$ confluent api-key create --resource lkc-qnygo6  --description "Flink SQL (Kafka)"
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+------------+------------------------------------------------------------------+
| API Key    | KAFKA_CLUSTER_API_KEY                                            |
| API Secret | KAFKA_CLUSTER_API_SECRET                                         |
+------------+------------------------------------------------------------------+

In this blog post I’m obviuosly not going to show my real API key details :)

Wherever you see KAFKA_CLUSTER_API_KEY (and the same for secrets, and Schema Registry) you can literally replace it with the actual value.

So in this line:

'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="KAFKA_CLUSTER_API_KEY"

If your API key value is bmV2ZXJnb25uYWdpdmV5b3V1cA==, you’d put it in like this:

'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="bmV2ZXJnb25uYWdpdmV5b3V1cA=="

If you’re writing to a topic that doesn’t exist yet you’ll need to create it first:

$ confluent kafka topic create test_table --cluster lkc-qnygo6
Created topic "test_table".

Then you can define a table over the topic:

CREATE TABLE test_table (
    id INT,
    col1 STRING)
WITH (
    'topic' = 'test_table',
    'connector' = 'kafka',
    'scan.bounded.mode' = 'unbounded',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'pkc-rgm37.us-west-2.aws.confluent.cloud:9092', (1)
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="KAFKA_CLUSTER_API_KEY" password="KAFKA_CLUSTER_API_SECRET";', (2)
    'format'='json'
    );
1 The endpoint of your Kafka cluster
2 The API key and secret for accessing your Kafka cluster

…write data to it:

Flink SQL> INSERT INTO test_table VALUES (1,'a');

…and query it:

Flink SQL> SELECT * FROM test_table;
+----+-------------+--------------------------------+
| op |          id |                           col1 |
+----+-------------+--------------------------------+
| +I |           1 |                              a |

You can check the topic itself by consuming from it:

$ confluent api-key use KAFKA_CLUSTER_API_KEY
Using API Key "KAFKA_CLUSTER_API_KEY".

$ confluent kafka topic consume test_table --from-beginning
Starting Kafka Consumer. Use Ctrl-C to exit.
{"id":1,"col1":"a"}

Using Schema Registry and Avro 🔗

af cc sr.excalidraw

If you want to use the Schema Registry you’ll need to do a few more steps as well as the above.

  1. Get the Schema Registry endpoint and API key details

  2. Add the necessary JAR file to your Flink cluster

For the Schema Registry endpoint and API key pair it’s similar to what we did above for the Kafka cluster.

Get the Schema Registry endpoint:

$ confluent schema-registry cluster describe
+--------------------------------+----------------------------------------------------+
[…]
| Cluster                        | lsrc-g70zm3                                        |
| Endpoint URL                   | https://psrc-13go8y7.us-west-2.aws.confluent.cloud |

Create an API key pair for accessing the Schema Registry, using the Cluster ID of the Schema Registry (not your Kafka cluster ID) as the resource:

$ confluent api-key create --resource lsrc-g70zm3 --description "Flink SQL (SR)"
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+------------+------------------------------------------------------------------+
| API Key    | SCHEMA_REGISTRY_API_KEY                                          |
| API Secret | SCHEMA_REGISTRY_API_SECRET                                       |
+------------+------------------------------------------------------------------+

Apache Flink doesn’t ship with support for Schema Registry Avro, but it is supported as a separate component. If you’re using SQL Client (as I am here) you need to add the flink-sql-avro-confluent-registry JAR file to your deployment (e.g. by putting it in the /lib folder of your Flink cluster nodes). Make sure that you use the correct version of the file for the version of Flink that you’re running! The one linked to here is for 1.20.2.

With the JAR installed and your cluster rebooted, you’re ready to create a table on a topic serialised using Schema Registry Avro:

CREATE TABLE orders00 (
    `order_id` VARCHAR(2147483647) NOT NULL,
    `customer_id` INT NOT NULL,
    `product_id` VARCHAR(2147483647) NOT NULL,
    `price` DOUBLE NOT NULL
    )
WITH (
    'topic' = 'orders00',
    'connector' = 'kafka',
    'scan.bounded.mode' = 'unbounded',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'pkc-rgm37.us-west-2.aws.confluent.cloud:9092', (1)
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="KAFKA_CLUSTER_API_KEY" password="KAFKA_CLUSTER_API_SECRET";', (2)
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'https://psrc-13go8y7.us-west-2.aws.confluent.cloud', (3)
    'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
    'avro-confluent.basic-auth.user-info' = 'SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET' (4)
    );
1 The endpoint of your Kafka cluster
2 The API key and secret for accessing your Kafka cluster
3 The endpoint of your Schema Registry
4 The API key and secret for accessing your Schema Registry

Query the table:

Flink SQL> SELECT * FROM orders00 LIMIT 5;
+----+--------------------------------+-------------+-------------+---------+
| op |                       order_id | customer_id |  product_id |   price |
+----+--------------------------------+-------------+-------------+---------+
| +I | 8581b9ab-6c6c-42c0-8d69-252... |        3049 |        1349 |   29.32 |
| +I | 91b69bd2-8c23-4380-b6b4-dc0... |        3160 |        1477 |   43.59 |
| +I | e0f3d4af-a626-47a3-b8a1-c09... |        3245 |        1113 |   37.72 |
| +I | 3eec7aea-ff5a-4852-a357-1ce... |        3220 |        1401 |   92.44 |
| +I | 3fe99471-6ea8-4add-86f8-16e... |        3006 |        1195 |   68.66 |

Validating your cluster and API details using kcat 🔗

As well as the Confluent CLI, you can also use kcat to check that the cluster details you’re specifying in your Flink SQL DDL are correct.

Here’s an example of querying the same orders00 topic as above, using kcat:

docker run --rm --tty edenhill/kcat:1.7.1 \
    -b pkc-rgm37.us-west-2.aws.confluent.cloud:9092 \
    -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN \
    -X sasl.username=KAFKA_CLUSTER_API_KEY -X sasl.password=KAFKA_CLUSTER_API_SECRET \
    -s avro -r https://SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET@psrc-13go8y7.us-west-2.aws.confluent.cloud \
    -C -t orders00 -c5
{"order_id": "670214e7-65e9-4d71-aabe-6b67a83c6a94", "customer_id": 3053, "product_id": "1269", "price": 38.409999999999997}
{"order_id": "4785a9f2-2529-4315-8f85-acd01ccf01de", "customer_id": 3205, "product_id": "1268", "price": 34.149999999999999}
{"order_id": "6126f74f-b72a-469e-b4f2-03961950e1aa", "customer_id": 3086, "product_id": "1260", "price": 77.280000000000001}
{"order_id": "f308e7cb-1be3-47bc-8fdc-42b6e8b7035e", "customer_id": 3196, "product_id": "1166", "price": 79.950000000000003}
{"order_id": "9908d516-441f-4db1-8378-f61713118c34", "customer_id": 3139, "product_id": "1429", "price": 48.0}
TABLE OF CONTENTS