This is based on using Confluent Cloud to provide your managed Kafka and Schema Registry. All that you run yourself is the Kafka Connect worker.
Optionally, you can use this Docker Compose to run the worker and a sample MySQL database.
What you need đź”—
A Confluent Cloud account with a Kafka and Schema Registry API host names and keys. Write these to a .env
file:
CCLOUD_BROKER_HOST=
CCLOUD_API_KEY=
CCLOUD_API_SECRET=
CCLOUD_SCHEMA_REGISTRY_URL=
CCLOUD_SCHEMA_REGISTRY_API_KEY=
CCLOUD_SCHEMA_REGISTRY_API_SECRET=
Install Debezium connector đź”—
This article assumes that you’re running your own Kafka Connect worker with the appropriate configuration done to hook it up to Confluent Cloud’s brokers and Schema Registry.
You need to install the Debezium connector on the Kafka Connect worker:
confluent-hub install --no-prompt debezium/debezium-connector-mysql:0.10.0
You can also do this as part of your Docker Compose:
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-mysql:0.10.0
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
Pre-create the topics to which you’ll be writing 🔗
-
Make sure your
ccloud
environment is using the correct Confluent Cloud cluster$ ccloud kafka cluster list Id | Name | Provider | Region | Durability | Status +-------------+-------------------+----------+-----------+------------+--------+ lkc-42p8m | pipeline-to-cloud | aws | us-east-1 | HIGH | UP * lkc-43xgj | race-mapper | aws | us-east-1 | LOW | UP $ ccloud kafka cluster use lkc-42p8m
-
Create the required topics:
-
Name is set in the configuration property
database.history.kafka.topic
. Must not be partitioned.ccloud kafka topic create --partitions 1 dbz_dbhistory.asgard-01
If you don’t create this topic in advance, Debezium will do so for you, but with a hardcoded timeout of 3 seconds which is often not long enough in a Cloud environment—hence it’s best to create it in advance.
-
Enabled by default, set
include.schema.changes
to false if not required.Name is taken from the configuration property
database.server.name
(asgard
). In this example I’m using theRegexRouter
Single Message Transform which prepends amysql-01-
prefix to the topic name too. This is optional.Note that this topic must not be partitioned - Thanks to Terry Franklin for this đź‘Ť
ccloud kafka topic create --partitions 1 mysql-01-asgard
-
One topic per table ingested. The topic name is made up by the
database.server.name
(asgard
), the database name (demo
), and the table name.In this example I’m using the
RegexRouter
Single Message Transform which prepends amysql-01-
prefix to the topic name too. This is optional.ccloud kafka topic create mysql-01-asgard.demo.customers ccloud kafka topic create mysql-01-asgard.demo.transactions
-
Note
|
If you don’t pre-create your topics, you’ll get repeating errors in your Kafka Connect worker log:
You can create the topics afterwards if you forget, but it’s easier up-front. |
Create the connector đź”—
Now create the connector itself, substituting your MySQL details below as indicated. The Confluent Cloud details and credentials will be picked up from the file /data/credentials.properties
local to the Kafka Connect worker—which if you’re using Docker can be mapped from the same .env
file as above. Or, just hardcode the values if you’d prefer 🤷‍.
The configuration is the same as a normal Debezium connector except the additional details for the connector to be able to connect to Confluent Cloud for writing and reading the database schema history topic.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-mysql-01/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.name": "asgard",
"database.history.kafka.bootstrap.servers": "${file:/data/credentials.properties:CCLOUD_BROKER_HOST}",
"database.history.kafka.topic": "dbz_dbhistory.asgard-01",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm": "https",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:CCLOUD_API_KEY}\" password=\"${file:/data/credentials.properties:CCLOUD_API_SECRET}\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm": "https",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:CCLOUD_API_KEY}\" password=\"${file:/data/credentials.properties:CCLOUD_API_SECRET}\";",
"table.whitelist":"demo.transactions,demo.customers",
"decimal.handling.mode":"double",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-01-$1"
}'
Check that the connector is running:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
source | source-debezium-mysql-01 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
Consume the data đź”—
Confluent Cloud GUI đź”—
kafkacat đź”—
# Set the variables, either from this script or manually
source .env
# Use kafkacat to pull Avro messages from Confluent Cloud
# deserialised using the Schema Registry hosted on Confluent Cloud
docker run --rm edenhill/kafkacat:1.5.0 \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
-b ${CCLOUD_BROKER_HOST} \
-X sasl.username="${CCLOUD_API_KEY}" \
-X sasl.password="${CCLOUD_API_SECRET}" \
-r https://"${CCLOUD_SCHEMA_REGISTRY_API_KEY}":"${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"@${CCLOUD_SCHEMA_REGISTRY_URL} \
-s avro \
-t mysql-01-asgard.demo.transactions \
-C -o beginning
{"txn_id": {"int": 996}, "customer_id": {"int": 4}, "amount": {"double": 69.819999999999993}, "currency": {"string": "CNY"}, "txn_timestamp": {"string": "2018-04-10T10:23:41Z"}}
{"txn_id": {"int": 997}, "customer_id": {"int": 1}, "amount": {"double": 74.170000000000002}, "currency": {"string": "PEN"}, "txn_timestamp": {"string": "2018-11-19T15:29:14Z"}}
{"txn_id": {"int": 998}, "customer_id": {"int": 2}, "amount": {"double": -92.920000000000002}, "currency": {"string": "JPY"}, "txn_timestamp": {"string": "2018-05-25T19:43:48Z"}}
{"txn_id": {"int": 999}, "customer_id": {"int": 1}, "amount": {"double": 71.159999999999997}, "currency": {"string": "EUR"}, "txn_timestamp": {"string": "2018-11-15T07:24:44Z"}}
{"txn_id": {"int": 1000}, "customer_id": {"int": 5}, "amount": {"double": 28.149999999999999}, "currency": {"string": "IRR"}, "txn_timestamp": {"string": "2018-01-12T14:53:49Z"}}
{"txn_id": {"int": 603}, "customer_id": {"int": 4}, "amount": {"double": -85.510000000000005}, "currency": {"string": "CNY"}, "txn_timestamp": {"string": "2018-11-08T22:06:49Z"}}