I wanted to get some data from a Kafka topic:
ksql> PRINT PERSON_STATS FROM BEGINNING;
Key format: KAFKA (STRING)
Value format: AVRO
rowtime: 2/25/20 1:12:51 PM UTC, key: robin, value: {"PERSON": "robin",
"LOCATION_CHANGES":1, "UNIQUE_LOCATIONS": 1}
into Postgres, so did the easy thing and used Kafka Connect with the JDBC Sink connector.
I wanted to use UPSERT
behaviour based on the key of the Kafka message (as shown above, robin
), so set pk.mode = record_key
. Unfortunately this didn’t work and errored out with:
Need exactly one PK column defined since the key schema for records is a primitive type, defined columns are: []
Hmmmmm, I was puzzled. How can I specify a field name for something that’s the message key?
Turns out that if you’ve got a primitive field, you need to specify the field name that is to be created on the target table.
So this works:
…
pk.mode = record_key
pk.fields = 'PERSON'
…
Which then creates a table in the target database like this:
postgres=# \d "PERSON_STATS"
Table "public.PERSON_STATS"
Column | Type | Collation | Nullable | Default
------------------+--------+-----------+----------+---------
PERSON | text | | not null |
LOCATION_CHANGES | bigint | | |
UNIQUE_LOCATIONS | bigint | | |
Indexes:
"PERSON_STATS_pkey" PRIMARY KEY, btree ("PERSON")
with data that updates in place as changes are made to the topic in Kafka