Kafka Connect JDBC Sink - setting the key field name

Published by in Kafka Connect, JDBC Sink at https://preview.rmoff.net/2020/02/25/kafka-connect-jdbc-sink-setting-the-key-field-name/

I wanted to get some data from a Kafka topic:

ksql> PRINT PERSON_STATS FROM BEGINNING;
Key format: KAFKA (STRING)
Value format: AVRO
rowtime: 2/25/20 1:12:51 PM UTC, key: robin, value: {"PERSON": "robin",
 "LOCATION_CHANGES":1, "UNIQUE_LOCATIONS": 1}

into Postgres, so did the easy thing and used Kafka Connect with the JDBC Sink connector.

I wanted to use UPSERT behaviour based on the key of the Kafka message (as shown above, robin), so set pk.mode = record_key. Unfortunately this didn’t work and errored out with:

Need exactly one PK column defined since the key schema for records is a primitive type, defined columns are: []

Hmmmmm, I was puzzled. How can I specify a field name for something that’s the message key?

Turns out that if you’ve got a primitive field, you need to specify the field name that is to be created on the target table.

So this works:

pk.mode = record_key
pk.fields = 'PERSON'

Which then creates a table in the target database like this:

postgres=# \d "PERSON_STATS"
                Table "public.PERSON_STATS"
      Column      |  Type  | Collation | Nullable | Default
------------------+--------+-----------+----------+---------
 PERSON           | text   |           | not null |
 LOCATION_CHANGES | bigint |           |          |
 UNIQUE_LOCATIONS | bigint |           |          |
Indexes:
    "PERSON_STATS_pkey" PRIMARY KEY, btree ("PERSON")

with data that updates in place as changes are made to the topic in Kafka