When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. If you have set auto.create.topics.enable = true
on your broker then the topic will be created when written to. If auto.create.topics.enable = false
(as it is on Confluent Cloud and many self-managed environments, for good reasons) then you can tell Kafka Connect to create those topics first. This was added in Apache Kafka 2.6 (Confluent Platform 6.0) - prior to that you had to manually create the topics yourself otherwise the connector would fail.
Configuring Kafka Connect to create topics 🔗
Kafka Connect (as of Apache Kafka 2.6) ships with a new worker configuration, topic.creation.enable
which is set to true
by default. So long as this is set, you can then specify the defaults for new topics to be created by a connector in the connector configuration:
[…]
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
[…]
Without these two settings present in the connector configuration, Kafka Connect will not create the target topic for you. |
Defining custom topic properties with Kafka Connect source connectors 🔗
When topics are created they are done so using the defaults configured on the broker for topic creation including num.partitions
and default.replication.factor
. There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. This is particularly true for connectors which are creating a large number of topics, or where the topic name is not known in advance (e.g. when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. Common settings for a topic that you may want to customise include cleanup.policy, min.insync.replicas, and compression.type.
KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to customise topic-level configurations for topics created by Kafka Connect source connectors.
Just the defaults, ma’am. 🔗
Here’s a very simply Kafka Connect source connector, reading data in from a file:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-txt-file-00/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topic" : "testdata-00",
"file" : "/data/test.txt"
}'
Since the broker is configured to automagically create new topics (auto.create.topics.enable = true
), it does so and using the defaults - one partition, replication factor of 1, etc. We can examine this using various tools:
-
kafka-topics
$ kafka-topics --bootstrap-server broker:29092 --topic testdata-00 --describe Topic: testdata-00 PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: testdata-00 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
-
kafkacat
$ kafkacat -b broker:29092 -L -J|jq '.topics[] | select(.topic =="testdata")'
{ "topic": "testdata-00", "partitions": [ { "partition": 0, "leader": 1, "replicas": [ { "id": 1 } ], "isrs": [ { "id": 1 } ] } ] }
Setting the configuration for auto-created topics 🔗
Let’s see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when it’s created.
Attempt 1 … Crashed and burned, Mav 🔗
In my sandbox I just have a single broker so I’m going to leave the number of replicas as a sensible setting of 1, but I’m going to change the number of partitions to four, as well as the cleanup policy from its default of delete
to instead compact
.
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-txt-file-01/config \
-d '{
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topic" : "testdata-01",
"file" : "/data/test.txt",
"topic.creation.default.partitions" : 4,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.cleanup.policy" : "compact"
}'
If you are setting topic creation overrides you must include replication.factor and partitions even if you’re not specifying a value that’s different from the broker default.
|
In the broker log you can see that the cleanup.policy
configuration has been honoured ({cleanup.policy=compact}
):
[2021-01-06 12:03:04,184] INFO Creating topic testdata-01 with configuration {cleanup.policy=compact} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
🤯 ☠️ 💀 But, alas! The connector fails:
[2021-01-06 12:03:04,346] ERROR WorkerSourceTask{id=source-txt-file-01-0} failed to send record to testdata-01: (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.
The FileStreamSourceConnector
sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException
.
Attempt 2… I don’t know, but uh, it’s looking good so far. 🔗
Let’s try a different variation just to prove out the topic configuration:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-txt-file-02/config \
-d '{
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topic" : "testdata-02",
"file" : "/data/test.txt",
"topic.creation.default.partitions" : 4,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.compression.type" : "snappy"
}'
In the Kafka Connect worker log you can see the settings used (under the covers it’s done through TopicCreationGroup
):
[2021-01-06 12:11:29,256] INFO Created topic '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=4, replicationFactor=1, otherConfigs={compression.type=snappy}} (org.apache.kafka.connect.runtime.WorkerSourceTask)
Checking out the topic details we can see it’s as we wanted it - four partitions, and using snappy compression 💥
$ kafka-topics --bootstrap-server broker:29092 --topic testdata-02 --describe
Topic: testdata-02 PartitionCount: 4 ReplicationFactor: 1 Configs: compression.type=snappy
Topic: testdata-02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-02 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-02 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Bonus - doing it through ksqlDB 🔗
ksqlDB can be used to create Kafka Connect connectors, either against an existing Kafka Connect cluster or using ksqlDB’s embedded Connect worker. Here’s an example of creating a connector that overrides the min.insync.replicas
, partition count, and replication factor for a created topic:
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH (
'connector.class' = 'org.apache.kafka.connect.file.FileStreamSourceConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topic' = 'testdata-03',
'file' = '/data/test.txt',
'topic.creation.default.partitions' = 4,
'topic.creation.default.replication.factor' = 1,
'topic.creation.default.min.insync.replicas' = 1
);
Message
--------------------------------------
Created connector SOURCE_TXT_FILE_03
--------------------------------------
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
testdata-04 | 4 | 1
-------------------------------------------------------------------------
ksql> PRINT 'testdata-04' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2021/01/06 14:09:27.522 Z, key: <null>, value: Hello world!
Topic details:
kafka-topics --bootstrap-server broker:29092 --topic testdata-03 --describe
Topic: testdata-03 PartitionCount: 4 ReplicationFactor: 1 Configs: min.insync.replicas=1
Topic: testdata-03 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-03 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-03 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-03 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic Creation Groups 🔗
In the example above I used just the default
topic creation group, but you can create multiple groups of configuration based on the topic name.
I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name.