When you create a sink connector in Kafka Connect, by default it will start reading from the beginning of the topic and stream all of the existing—and new—data to the target. The setting that controls this behaviour is auto.offset.reset
, and you can see its value in the worker log when the connector runs:
[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
…
If you want Kafka Connect to start reading from the end of the topic instead, you need to set auto.offset.reset=latest
. There are two ways to do this:
-
Override it at the Worker level, which impacts all connectors on that worker. That is, your connector will start from the end of the topic, but so will any other connector created on that worker. You could create different workers, but it’s not very elegant.
-
To do this, add the following to your
connect-distributed.properties
(or equivalent if you’re using standalone mode):consumer.auto.offset.reset=latest
-
-
As of Apache Kafka 2.3 (available as part of Confluent Platform 5.3) you can now override consumer (and producer) properties per connector 🙌. Note that this is not enabled by default.
-
To do this you first need to allow it in the worker config:
connector.client.config.override.policy=All
NoteIf you’re using Docker then the configuration is set through the environment variable
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY
, for example in Docker Compose would look like this:CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: 'All'
-
Once that’s set, you can change the consumer properties that you want to in each connector’s configuration individually. For example:
"consumer.override.auto.offset.reset": "latest"
-
You can check the worker log and you’ll see:
… [2019-08-09 19:08:55,536] INFO ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest …
-
-
If you try to override the consumer/producer configuration and you have not set the policy on the worker as above then it will fail when you try to create the connector:
Connector configuration is invalid and contains the following 1 error(s): The 'None' policy does not allow 'auto.offset.reset' to be overridden in the connector configuration.
-
On Apache Kafka 2.2 and below, the override will simply be ignored.