I got the error SQLSyntaxErrorException: BLOB/TEXT column 'MESSAGE_KEY' used in key specification without a key length
with Kafka Connect JDBC Sink connector (v10.0.2) and MySQL (8.0.23)
[2021-03-11 11:07:32,627] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.SQLSyntaxErrorException: BLOB/TEXT column 'MESSAGE_KEY' used in key specification without a key length
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:101)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Looking further up the log I could see this exception came from this unfortunate series of events:
INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
INFO Checking MySql dialect for existence of TABLE "test_rmoff2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
INFO Using MySql dialect TABLE "test_rmoff2" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
INFO Creating table with sql: CREATE TABLE `test_rmoff2` (
`ID` TEXT NULL,
`Artist` TEXT NULL,
`Song` TEXT NULL,
`MESSAGE_KEY` TEXT NOT NULL,
PRIMARY KEY(`MESSAGE_KEY`)) (io.confluent.connect.jdbc.sink.DbStructure)
This comes about because my JDBC Sink connector was set to auto.create=true
with pk.mode=record_key
and pk.fields=MESSAGE_KEY
. This meant that the primary key for the table was coming from the key of the Kafka message and being stored in a field called MESSAGE_KEY
.
The key for the Kafka message was just a string (a primitive). You can see from the source code of the connector that it maps STRING
type from Kafka Connect to TEXT
in MySQL (previously it was VARCHAR(256)
). To define the field as a primary key in MySQL requires it to have a fixed length, not just TEXT
.
Workarounds:
-
Use version 5.5.3 of the connector in which
STRING
is still mapped toVARCHAR(256)
(from the download page click onDownload previous versions
) -
Create the table in MySQL ahead of populating it with the JDBC sink connector and the field’s data type appropriately (e.g.
VARCHAR(255)
if that’s long enough for the data)CREATE TABLE `test_rmoff2` ( `ID` TEXT NULL, `Artist` TEXT NULL, `Song` TEXT NULL, `MESSAGE_KEY` VARCHAR(255) NOT NULL, PRIMARY KEY(`MESSAGE_KEY`));