Prompted by a question on StackOverflow I thought I’d take a quick look at setting up ksqlDB to ingest CDC events from Microsoft SQL Server using Debezium. Some of this is based on my previous article, Streaming data from SQL Server to Kafka to Snowflake ❄️ with Kafka Connect.
Setting up the Docker Compose 🔗
I like standalone, repeatable, demo code. For that reason I love using Docker Compose and I embed everything in there - connector installation, the kitchen sink - the works.
You can find the complete Docker Compose file on GitHub.
Installing connectors in ksqlDB without Confluent Hub client 🔗
I’ll usually take advantage of the command:
stanza in a Docker Compose service to do things like connector installation, as detailed here. In ksqlDB 0.11 the Confluent Hub client is absent so I’ve had to take a slightly hackier route. If you head over to Confluent Hub and download the connector you want (in this case Debezium MS SQL) you can check the network console to get the direct URL, in this case
https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip
Now, this URL is liable to change but for now, it works :) (I realise that this runs contrary to making a demo repeatable, but what’s life if we can’t live on the edge a bit)
With this code we can download and install the connector within the ksqlDB Docker container when it spins up
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
yum install -y unzip
unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
There’s a wrinkle in the plan here which is that the latest version of the container runs as a non-root user, and sudo
is not installed (no sandwiches for me). To hack around this we elevate the container to run as root user in the Docker Compose spec:
ksqldb:
image: confluentinc/ksqldb-server:0.11.0
container_name: ksqldb
user: root
…
Now when the container launches it downloads the connector, installs unzip
and unzips the connector archive directly into the plugin.path
in which Kafka Connect (running embedded in ksqlDB) will look for it.
The 'proper' way to do this is either bake your own ksqlDB image with the connector plugin already installed, or to download the connector to the host machine, and mount it into the ksqlDB container. Both of these are fine, but involve more moving parts and stuff to go wrong than a standalone Docker Compose file for my purposes :) |
Running the stack 🔗
Spin up the Docker Compose file
docker-compose up -d
and then launch ksqlDB - this seemingly complex snippet simply waits for ksqlDB to be available before launching the CLI:
docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
In a separate terminal, once ksqlDB has finished starting (i.e. once the ksqlDB CLI starts from the above command) make sure that the MS SQL connector has installed correctly:
docker exec ksqldb curl -s localhost:8083/connector-plugins
You should see
[{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.2.2.Final"}]
Configuring MS SQL for CDC 🔗
When the MS SQL container starts a couple of scripts are run to set up the database for CDC and add some test data. If you’re not using the Docker Compose then you need to run these yourself:
USE [master]
GO
CREATE DATABASE demo;
GO
USE [demo]
EXEC sys.sp_cdc_enable_db
GO
-- Run this to confirm that CDC is now enabled:
SELECT name, is_cdc_enabled FROM sys.databases;
GO
use [demo];
CREATE TABLE demo.dbo.ORDERS ( order_id INT, customer_id INT, order_ts DATE, order_total_usd DECIMAL(5,2), item VARCHAR(50) );
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'ORDERS',
@role_name = NULL,
@supports_net_changes = 0
GO
-- At this point you should get a row returned from this query
SELECT s.name AS Schema_Name, tb.name AS Table_Name , tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc FROM sys.tables tb INNER JOIN sys.schemas s on s.schema_id = tb.schema_id WHERE tb.is_tracked_by_cdc = 1
GO
-- h/t William Prigol Lopes https://stackoverflow.com/a/61698148/350613
Adding a SQL Server connector in ksqlDB 🔗
You should now have the ksqlDB prompt open
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.11.0, Server v0.11.0 located at http://ksqldb:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
From the ksqlDB prompt create the connector:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_ORDERS_01 WITH (
'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname' = 'mssql',
'database.port' = '1433',
'database.user' = 'sa',
'database.password' = 'Admin123',
'database.dbname' = 'demo',
'database.server.name' = 'mssql',
'table.whitelist' = 'dbo.orders',
'database.history.kafka.bootstrap.servers' = 'broker:29092',
'database.history.kafka.topic' = 'dbz_dbhistory.mssql.asgard-01',
'decimal.handling.mode' = 'double'
);
Check that it’s running successfully
SHOW CONNECTORS;
Connector Name | Type | Class | Status
--------------------------------------------------------------------------------------------------------------------
SOURCE_MSSQL_ORDERS_01 | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
--------------------------------------------------------------------------------------------------------------------
If it’s not (e.g. if the Status is WARNING
) then run docker logs -f ksqldb
and page through to find the ERROR
.
Using MS SQL data in ksqlDB 🔗
With the connector running and the data flowing you can declare a stream against it
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql.dbo.ORDERS', VALUE_FORMAT='AVRO');
and then start to explore the data:
SET 'auto.offset.reset' = 'earliest';
SELECT SOURCE->NAME, SOURCE->SCHEMA + '.' + SOURCE->"TABLE", OP,BEFORE,AFTER FROM ORDERS EMIT CHANGES LIMIT 2;
+-----------------------+-----------------------+-------+---------+-----------------------+
|NAME |KSQL_COL_0 |OP |BEFORE |AFTER |
+-----------------------+-----------------------+-------+---------+-----------------------+
|mssql |dbo.ORDERS |r |null |{ORDER_ID=1, CUSTOMER_I|
| | | | |D=7, ORDER_TS=18256, OR|
| | | | |DER_TOTAL_USD=2.1, ITEM|
| | | | |=Proper Job} |
|mssql |dbo.ORDERS |r |null |{ORDER_ID=2, CUSTOMER_I|
| | | | |D=8, ORDER_TS=18236, OR|
| | | | |DER_TOTAL_USD=0.23, ITE|
| | | | |M=Wainwright} |
Note the use of the →
operator to access the nested fields.
SELECT AFTER->ORDER_ID, AFTER->CUSTOMER_ID, AFTER->ORDER_TOTAL_USD FROM ORDERS EMIT CHANGES LIMIT 5;
+-------------+--------------+------------------+
|ORDER_ID |CUSTOMER_ID |ORDER_TOTAL_USD |
+-------------+--------------+------------------+
|1 |7 |2.1 |
|2 |8 |0.23 |
|3 |12 |4.3 |
|4 |7 |4.88 |
|5 |14 |3.89 |
Limit Reached
Query terminated
Capturing changes from MS SQL 🔗
So far we’ve just seen the snapshot/bootstrap ingest of data from MS SQL into Kafka/ksqlDB. Let’s make some changes in MS SQL and see how they show up in ksqlDB.
Launch the MS SQL CLI
docker exec -it mssql bash -c '/opt/mssql-tools/bin/sqlcmd -l 30 -d demo -S localhost -U sa -P $SA_PASSWORD'
Make some changes to the data
DELETE FROM ORDERS WHERE ORDER_ID=1;
UPDATE ORDERS SET ORDER_TOTAL_USD = ORDER_TOTAL_USD * 0.9 WHERE ORDER_ID =2;
INSERT INTO ORDERS (order_id, customer_id, order_ts, order_total_usd, item) values (9, 5, '2019-11-29T11:10:39Z', '2.24', 'Black Sheep Ale');
GO
Check out the data in ksqlDB
SELECT OP,
SOURCE->SCHEMA + '.' + SOURCE->"TABLE",
BEFORE->ORDER_ID AS B_ORDER_ID,
AFTER->ORDER_ID AS A_ORDER_ID,
BEFORE->ORDER_TOTAL_USD AS B_ORDER_TOTAL_USD,
AFTER->ORDER_TOTAL_USD AS A_ORDER_TOTAL_USD,
BEFORE->ITEM AS B_ITEM,
AFTER->ITEM AS A_ITEM
FROM ORDERS
WHERE NOT OP='r'
EMIT CHANGES;
+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|OP |KSQL_COL_0 |B_ORDER_ID |A_ORDER_ID |B_ORDER_TOTAL_USD |A_ORDER_TOTAL_USD |B_ITEM |A_ITEM |
+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|d |dbo.ORDERS |1 |null |2.1 |null |Proper Job |null |
|u |dbo.ORDERS |2 |2 |0.23 |0.21 |Wainwright |Wainwright |
|c |dbo.ORDERS |null |9 |null |2.24 |null |Black Sheep Ale |
Things to note:
-
The
d
deletion message gives you access to the row state before it was deleted -
The
u
update message gives you the field values before they were updated (ORDER_TOTAL_USD
) -
The
c
creation message has null values in theBEFORE
object because there were no values before the row was created :)