Kafka Connect exposes a REST interface through which all config and monitoring operations can be done. You can create connectors, delete them, restart them, check their status, and so on. But, I found a situation recently in which I needed to delete a connector and couldn’t do so with the REST API. Here’s another way to do it, by amending the configuration Kafka topic that Kafka Connect in distributed mode uses to persist configuration information for connectors. Note that this is not a recommended way of working with Kafka Connect—the REST API is there for a good reason :)
Let’s imagine we’ve got three connectors running, and we want to get rid of one of them. You can see what’s running using the REST API and some nifty bash work:
curl -s "http://localhost:8083/connectors"| \
jq '.[]'| \
xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
column -s : -t| \
sed 's/\"//g'| \
sort
mongodb-connector | RUNNING | RUNNING
source-rest-weather_york | RUNNING | RUNNING
twitter_source_json_01 | RUNNING | RUNNING
When you configure Kafka Connect you’ll set three Kafka topics for it to use to persist data across the Connect cluster. This will include a topic that holds configurations:
config.storage.topic=docker-connect-configs
So having identified the topic, examine its contents with kafkacat
:
kafkacat -b localhost:9092 \
-t docker-connect-configs \
-C \
-K: \
-f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
Identify the connector you want to remove, e.g.:
Key (27 bytes): connector-mongodb-connector
Value (195 bytes): {"properties":{"connector.class":"io.debezium.connector.mongodb.MongoDbConnector","mongodb.hosts":"rs0/mongodb:27017","mongodb.name":"ubnt","database.whitelist":"ace","name":"mongodb-connector"}}
Timestamp: 1528723016429 Partition: 0 Offset: 20
Note the key value.
Shutdown Kafka Connect and make sure it’s fully stopped before proceeding.
Now send a tombstone message (NULL
value) for this key
echo "connector-mongodb-connector:" | \
kafkacat -b localhost -t docker-connect-configs -P -Z -K:
If you examine the topic again for this particular key you’ll notice that there are now two messages:
Key (27 bytes): connector-mongodb-connector
Value (195 bytes): {"properties":{"connector.class":"io.debezium.connector.mongodb.MongoDbConnector","mongodb.hosts":"rs0/mongodb:27017","mongodb.name":"ubnt","database.whitelist":"ace","name":"mongodb-connector"}}
Timestamp: 1528723016429 Partition: 0 Offset: 20
--
Key (27 bytes): connector-mongodb-connector
Value (-1 bytes):
Because this is a compacted topic (cleanup.policy=compact
) the latest value for this key is what will be retained when compaction runs—and will be retained for ever. Kafka acts as the permanent store of data. Cool huh.
Start Kafka Connect back up and you will see that the connector is now gone:
source-rest-weather_york | RUNNING | RUNNING
twitter_source_json_01 | RUNNING | RUNNING