Kafka Connect in distributed mode uses Kafka itself to persist the offsets of any source connectors. This is a great way to do things as it means that you can easily add more workers, rebuild existing ones, etc without having to worry about where the state is persisted. I personally always recommend using distributed mode, even if just for a single worker instance - it just makes things easier, and more standard. Watch my talk online here to understand more about this. If you want to reset the offset of a source connector then you can do so by very carefully modifying the data in the Kafka topic itself.
Note
|
The offsets of Sink connectors are managed using the Kafka consumer group protocol (see here for how an example of how to reset those) |
In this example I’m using the very simple FileStreamSourceConnector.
curl -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-file-01/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topic": "testdata",
"file":"/data/testdata.txt"
}'
As I add data to the file, it is ingested by Kafka Connect and written to a topic.
Note
|
This connector is just for testing really; for proper file-based ingestion check out kafka-connect-spooldir. There’s also kafka-connect-file-pulse. |
So now let’s imagine we want to get the connector to re-consume all of the source data. One very simple option is to just rename the connector, which then means it has no existing offsets stored, and thus will start from the beginning again. This can be a bit clumsy though depending on the use case. The alternative is to modify the actual offsets themselves.
The first thing is to determine the Kafka topic being used to persist the offsets. The default is usually connect-offsets
but I’ve taken to overriding this to include an underscore prefix to make it easy to spot an internal topic. Regardless; you can look at your Connect worker config, and/or check the worker log for offset.storage.topic
:
[2019-08-15 08:29:46,585] INFO DistributedConfig values:
[…]
offset.storage.partitions = 25
offset.storage.replication.factor = 1
offset.storage.topic = _connect-offsets
[…]
Now shutdown all connect workers that are using this topic. If you don’t do this then funny things might happen, since connect only periodically flushes offsets to the topic, and doesn’t read them from it except at startup.
With the connect worker shutdown, you can now examine the topic. I’m using kafkacat
here because it is very flexible in how it works with both keys and partitions which will be very important in a moment.
$ kafkacat -b localhost:9092 -t _connect-offsets -C -K#
% Reached end of topic _connect-offsets [0] at offset 0
% Reached end of topic _connect-offsets [1] at offset 0
[…]
["source-file-01",{"filename":"/data/testdata.txt"}]#{"position":87}
[…]
We can see here using the consumer mode (-C
) and a key separator character of #
that the key of the message is the connector’s name plus file
["source-file-01",{"filename":"/data/testdata.txt"}]
and the value of the message is the position offset in the file
{"position":87}
Now use the -f
option of kafkacat
to display this even more clearly, along with a bunch of other important metadata including the partition:
$ kafkacat -b localhost:9092 -t _connect-offsets -C -f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o\n'
% Reached end of topic _connect-offsets [0] at offset 0
% Reached end of topic _connect-offsets [1] at offset 0
[…]
Key (52 bytes): ["source-file-01",{"filename":"/data/testdata.txt"}]
Value (15 bytes): {"position":87}
Timestamp: 1565859303551
Partition: 20
Offset: 0
[…]
Take note of the partition number, because we’ll need that shortly.
Having seen how Kafka Connect stores this data, we can now amend it. For this, the Kafka Connect worker really does need to be shut down. We’re going to prepare our payload which we’ll pass to kafkacat
in a similar way to above. The payload is going to be the key, which remains fixed, and a value. The value can either be a given offset, or it can be NULL
which denotes nothing, nada, nowt—start from scratch. Here’s sending a NULL, which is known as a tombstone message. Very important is that you specify the same target partition (using -p
) for the message as the one that you saw above:
echo '["source-file-01",{"filename":"/data/testdata.txt"}]#' | \
kafkacat -b localhost:9092 -t _connect-offsets -P -Z -K# -p 20
In the echo
we’re specifying the key (["source-file-01",{"filename":"/data/testdata.txt"}]
) followed by the key separator (#
, defined in the kafakcat
argument -K#
), and the -Z
in kafkacat
tells it to send the empty value as a NULL. As stated above, don’t forget to target the correct partition.
Now when we restart the Kafka Connect worker, we can see that the file has been re-processed (note the incrementing offset
value but repeating Value
payloads):
$ kafkacat -b localhost:9092 -t testdata -f 'Value (%S bytes): %s [Kafka message timestamp: %T / offset: %o]\n'
% Auto-selecting Consumer mode (use -P or -C to override)
Value (28 bytes): Thu Aug 15 08:54:18 UTC 2019 [Kafka message timestamp: 1565859259988 / offset: 0]
Value (28 bytes): Thu Aug 15 08:54:23 UTC 2019 [Kafka message timestamp: 1565859263997 / offset: 1]
Value (28 bytes): Thu Aug 15 08:54:26 UTC 2019 [Kafka message timestamp: 1565859267000 / offset: 2]
Value (28 bytes): Thu Aug 15 09:25:45 UTC 2019 [Kafka message timestamp: 1565861146526 / offset: 3]
Value (28 bytes): Thu Aug 15 09:25:58 UTC 2019 [Kafka message timestamp: 1565861158568 / offset: 4]
Value (28 bytes): Thu Aug 15 08:54:18 UTC 2019 [Kafka message timestamp: 1565861628675 / offset: 5]
Value (28 bytes): Thu Aug 15 08:54:23 UTC 2019 [Kafka message timestamp: 1565861628677 / offset: 6]
Value (28 bytes): Thu Aug 15 08:54:26 UTC 2019 [Kafka message timestamp: 1565861628677 / offset: 7]
Value (28 bytes): Thu Aug 15 09:25:45 UTC 2019 [Kafka message timestamp: 1565861628677 / offset: 8]
Value (28 bytes): Thu Aug 15 09:25:58 UTC 2019 [Kafka message timestamp: 1565861628677 / offset: 9]
Note
|
If this doesn’t work for you, make sure you’ve got the partition correct. If you don’t, it won’t work. |