Kafka Connect can be deployed in two modes: Standalone or Distributed. You can learn more about them in my Kafka Summit London 2019 talk.
I usually recommend Distributed for several reasons:
-
It can scale
-
It is fault-tolerant
-
It can be run on a single node sandbox or a multi-node production environment
-
It is the same configuration method however you run it
I usually find that Standalone is appropriate when:
-
You need to guarantee locality of task execution, such as picking up a log file from a folder on a specific machine
-
You don’t care about scale or fault-tolerance ;-)
-
You like re-learning how to configure something when you realise that you do care about scale or fault-tolerance X-D
My last snarky point on the list is why even if you’re just playing around with Kafka Connect on a laptop, learning it in Distributed mode means you learn it once, and then you’re all set. If you start with Standalone and its .properties
method of passing configuration files to the worker at startup, and then come to use Distributed you have to re-learn how to use the REST interface etc.
So anyway…a long lead into a short article pointing out some of the common mistakes that can be made when setting up multiple Kafka Connect workers in a cluster.
As always, refer to the documentation for more details.
Mistake 1: rest.advertised.host.name
set to localhost
🔗
The rest.advertised.host.name
(or if you’re using Docker, CONNECT_REST_ADVERTISED_HOST_NAME
) is how a Connect worker communicates with other workers in the cluster. If you set it to localhost
then each worker in the cluster will only ever be able to contact itself when you use the REST interface, e.g. to send configuration updates. If the worker happens to be the leader of the connect cluster then the command will work, but if it’s not then you’ll get this:
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}⏎
If you front your Kafka Connect workers with a load balancer with a random/round-robin policy then you’ll see the above error "randomly", since you’ll only get it if you happen to be forwarded to a worker that is not the leader.
The second problem with doing this is that even though you might get connectors running successfully (if you send the config REST call to a worker that is the leader) the tasks that run across the cluster will all be identified as running on localhost
, which makes it impossible to determine which worker they’re on. Here’s an example of a connector running a six tasks across three workers:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '."source-datagen-01".status.tasks'
[
{
"id": 0,
"state": "RUNNING",
"worker_id": "localhost:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "localhost:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "localhost:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "localhost:8083"
},
{
"id": 4,
"state": "RUNNING",
"worker_id": "localhost:8083"
},
{
"id": 5,
"state": "RUNNING",
"worker_id": "localhost:8083"
}
]
FIX: make sure rest.advertised.host.name
/ CONNECT_REST_ADVERTISED_HOST_NAME
is set to the hostname of the worker that is resolvable to the other workers. If you’ve got a private network (e.g. Docker, VPC, etc) then this is the internal hostname/IP of the workers. It is nothing to do with the external hostname that you might access it by through a load balancer etc.
Mistake 2: rest.advertised.host.name
set to something not resolvable by the workers 🔗
This is a variation on the above problem. The rest.advertised.host.name
(or if you’re using Docker, CONNECT_REST_ADVERTISED_HOST_NAME
) is how a Connect worker communicates with other workers in the cluster. It needs to be something that can be resolved by the other workers.
A problem that can arise is if you set this to an address that may be resolvable outside the Kafka Connect cluster (e.g. an external DNS hostname) but which isn’t within the cluster’s network.
If you do this then similarly to above, if you send the REST call to the worker that happens to be the leader of the cluster then things will work - but if it’s not the leader you’ll get
{"error_code":500,"message":"IO Error trying to forward REST request: java.net.UnknownHostException: foobar2: Name or service not known"}⏎
(where foobar2
is the hostname of the leader worker of the cluster)
Since Kafka Connect uses Kafka topics to distribute configuration, if you do send the REST call to the leader then it writes the config to the topic which the other workers then pick up - hence the connector will still execute.
FIX: make sure rest.advertised.host.name
/ CONNECT_REST_ADVERTISED_HOST_NAME
is set to the hostname of the worker that is resolvable to the other workers. If you’ve got a private network (e.g. Docker, VPC, etc) then this is the internal hostname/IP of the workers. It is nothing to do with the external hostname that you might access it by through a load balancer etc.
Mistake 3: Sharing the same Kafka topics for different Kafka Connect clusters 🔗
Kafka Connect uses Kafka topics to share and persist information about connector configuration, offsets, and the status of tasks. For each Kafka Connect cluster that you run, you need a unique set of three Kafka topics. If you try to share them, even having set a different group.id
for your Kafka Connect workers, you’ll find that each cluster will start running the other’s connectors too.
You can see why by examining the config topic; it doesn’t include the group.id
in the key for the messages that share the configuration (perhaps it should?), which means that any worker reading from this topic will assume that it’s for it to run and share amongst its cluster
$ kafkacat -b localhost:9092 -t _kafka-connect-configs -o beginning -f 'key: %k, payload: %s\n' -u -C
key: connector-source-datagen-01, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","kafka.topic":"item_details_01","max.interval":"250","quickstart":"ratings","tasks.max":"6","name":"source-datagen-01"}}
key: task-source-datagen-01-0, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
key: task-source-datagen-01-1, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
key: task-source-datagen-01-2, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
key: task-source-datagen-01-3, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
key: task-source-datagen-01-4, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
key: task-source-datagen-01-5, payload: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
key: commit-source-datagen-01, payload: {"tasks":6}