ksqlDB 0.7 will add support for message keys as primitive data types beyond just STRING
(which is all we’ve had to date). That means that Kafka messages are going to be much easier to work with, and require less wrangling to get into the form in which you need them. Take an example of a database table that you’ve ingested into a Kafka topic, and want to join to a stream of events. Previously you’d have had to take the Kafka topic into which the table had been ingested and run a ksqlDB processor to re-key the messages such that ksqlDB could join on them. Friends, I am here to tell you that this is no longer needed!
Note
|
ksqlDB 0.7 is not yet (7 February 2020) released - my notes here are based on the latest build from master on GitHub. You can find Docker images for the latest build here: https://hub.docker.com/r/rmoff/ksqldb-server |
Let’s take the example from above, ingesting data from a database. I’m going to use Debezium (of course), and stream in data from MySQL. Create the connector using the Kafka Connect REST API:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \
-d '{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.hostname" : "mysql",
"database.port" : "3306",
"database.user" : "debezium",
"database.password" : "dbz",
"database.server.id" : "42",
"database.server.name" : "asgard",
"table.whitelist" : "demo.customers",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"database.history.kafka.topic" : "dbhistory.demo" ,
"include.schema.changes" : "false",
"transforms": "unwrap,extractkey",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.extractkey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractkey.field": "id",
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}'
Check it’s running
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
source | source-debezium-mysql-00 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
Now let’s peel back the covers just a bit. The extremely eagle-eyed amongst you will have noticed that in the connector above I specified:
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"
This uses the new set of primitive converters that were added in KIP-305 as part of Apache Kafka 2.0. So Debezium will take the primary key of the table (id
) and set it as the key of the message, in a struct. I use the ExtractField
Single Message Transform (SMT) to lift this out of the struct, and write it as the key of the Kafka message as a signed 32-bit integer with the IntegerConverter
converter.
I also use the ExtractNewRecordState
SMT to flatten the value part of the message to just the current DB record state.
We can inspect the payload, which shows that things are working as we want them to. I’m using the -s
setting to specify the serde for reading the key and value:
kafkacat -b kafka:29092 \
-t asgard.demo.CUSTOMERS \
-C \
-c1 \
-s key=i \
-s value=avro \
-r http://schema-registry:8081
-f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'
Topic asgard.demo.CUSTOMERS / Partition 0 / Offset: 2 / Timestamp: 1581069692127
Headers:
Key (4 bytes): 3
Payload (155 bytes): {"id": 3, "first_name": {"string": "Mariejeanne"}, "last_name": {"string": "Cocci"}, "email": {"string": "[email protected]"}, "gender": {"string": "Female"}, "club_status": {"string": "bronze"}, "comments": {"string": "Multi-tiered bandwidth-monitored capability"}, "create_ts": {"string": "2020-02-07T09:35:27Z"}, "update_ts": {"string": "2020-02-07T09:35:27Z"}}
--
What are all these arguments for kafkacat
?
-
-b
is the broker connection, and-t
is the topic -
-C
means run as a consumer, and-c1
means just read one message and then exit -
-f
is the format string to use when rendering the message - here we’re showing a ton of useful metadata as well as the key and value themselves. -
-s
tells kafkacat how to deserialise the message’s key and/or value-
key=i
deserialises the key as a signed 32-bit integer -
value=avro
deserialise the value as Avro using the Schema Registry specified in-r
to fetch the schema
-
Use it in ksqlDB 🔗
Note
|
ksqlDB 0.7 is not yet (7 February 2020) released - my notes here are based on the latest build from master on GitHub. You can find Docker images for the latest build here: https://hub.docker.com/r/rmoff/ksqldb-server |
So we’ve got the Kafka topic populated correctly. Now we can declare a table over it, in ksqlDB, using the new ROWKEY … KEY
syntax with the appropriate primitive type.
Note that at the moment you have to declare the Avro schema explicitly if you are declaring the key’s type.
CREATE TABLE CUSTOMERS (ROWKEY INT KEY,
FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR, GENDER VARCHAR, CLUB_STATUS VARCHAR, COMMENTS VARCHAR, CREATE_TS VARCHAR, UPDATE_TS VARCHAR)
WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS',
VALUE_FORMAT='AVRO');
Check out the schema - note the INTEGER
key:
ksql> DESCRIBE CUSTOMERS;
Name : CUSTOMERS
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | INTEGER (system)
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
EMAIL | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
CLUB_STATUS | VARCHAR(STRING)
COMMENTS | VARCHAR(STRING)
CREATE_TS | VARCHAR(STRING)
UPDATE_TS | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
Query the TABLE:
ksql> SELECT * FROM CUSTOMERS EMIT CHANGES LIMIT 5;
+----------------+---------+-------------+-----------+------------------------+--------+------------+--------------------------+--------------------------+--------------------------+
|ROWTIME |ROWKEY |FIRST_NAME |LAST_NAME |EMAIL |GENDER |CLUB_STATUS |COMMENTS |CREATE_TS |UPDATE_TS |
+----------------+---------+-------------+-----------+------------------------+--------+------------+--------------------------+--------------------------+--------------------------+
|1581069692127 |1 |Rica |Blaisdell |rblaisdell0@rambler.ru |Female |bronze |Universal optimal hierarch|2020-02-07T09:35:27Z |2020-02-07T09:35:27Z |
| | | | | | | |y | | |
|1581069692127 |2 |Ruthie |Brockherst |rbrockherst1@ow.ly |Female |platinum |Reverse-engineered tangibl|2020-02-07T09:35:27Z |2020-02-07T09:35:27Z |
| | | | | | | |e interface | | |
|1581069692127 |3 |Mariejeanne |Cocci |mcocci2@techcrunch.com |Female |bronze |Multi-tiered bandwidth-mon|2020-02-07T09:35:27Z |2020-02-07T09:35:27Z |
| | | | | | | |itored capability | | |
|1581069692128 |4 |Hashim |Rumke |hrumke3@sohu.com |Male |platinum |Self-enabling 24/7 firmwar|2020-02-07T09:35:27Z |2020-02-07T09:35:27Z |
| | | | | | | |e | | |
|1581069692128 |5 |Hansiain |Coda |hcoda4@senate.gov |Male |platinum |Centralized full-range app|2020-02-07T09:35:27Z |2020-02-07T09:35:27Z |
| | | | | | | |roach | | |
Limit Reached
Query terminated
Now let’s take a stream of events that have a foreign key (USER_ID
) to the customer data above:
ksql> DESCRIBE RATINGS;
Name : RATINGS
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
ksql> SELECT USER_ID, STARS, MESSAGE FROM RATINGS EMIT CHANGES;
+----------+--------+------------------------------------------------------------------------------+
|USER_ID |STARS |MESSAGE |
+----------+--------+------------------------------------------------------------------------------+
|10 |4 |your team here rocks! |
|6 |1 |more peanuts please |
|19 |4 |why is it so difficult to keep the bathrooms clean ? |
|18 |3 |Exceeded all my expectations. Thank you ! |
|1 |1 |more peanuts please |
…
Join this stream to the customer data, on the common key:
ksql> SELECT C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER,
R.STARS,
R.MESSAGE
FROM RATINGS R
INNER JOIN CUSTOMERS C
ON R.USER_ID = C.ROWKEY
EMIT CHANGES;
+----------------+-------+-----------------------------------------------------+
|CUSTOMER |STARS |MESSAGE |
+----------------+-------+-----------------------------------------------------+
|Brena Tollerton |4 |your team here rocks! |
|Robinet Leheude |1 |more peanuts please |
|Josiah Brockett |4 |why is it so difficult to keep the bathrooms clean ? |
|Waldon Keddey |3 |Exceeded all my expectations. Thank you ! |
|Rica Blaisdell |1 |more peanuts please |
…
What if I’m using ksqlDB <0.7 (Confluent Platform <5.5) ? (or if I don’t want to type in the whole value schema if it’s in Avro?) 🔗
The option you’ve got here it to serialise the key as a string, and then in ksqlDB force the foreign key to the same type.
Here’s a new version of the connector, using the StringConverter. Note that it’s still using the ExtractField$Key
SMT.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-02/config \
-d '{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.hostname" : "mysql",
"database.port" : "3306",
"database.user" : "debezium",
"database.password" : "dbz",
"database.server.id" : "43",
"database.server.name" : "asgard2",
"table.whitelist" : "demo.customers",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"database.history.kafka.topic" : "dbhistory.demo" ,
"include.schema.changes" : "false",
"transforms": "unwrap,extractkey",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.extractkey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractkey.field": "id",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}'
Create a new version of the table - note that we don’t have to enter the full schema :)
ksql> CREATE TABLE CUSTOMERS2 WITH (KAFKA_TOPIC='asgard2.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
Message
---------------
Table created
---------------
ksql> DESCRIBE CUSTOMERS2;
Name : CUSTOMERS2
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | INTEGER
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
EMAIL | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
CLUB_STATUS | VARCHAR(STRING)
COMMENTS | VARCHAR(STRING)
CREATE_TS | VARCHAR(STRING)
UPDATE_TS | VARCHAR(STRING)
-----------------------------------------
Now we workaround the fact that the foreign key USER_ID
is an INT
on the events we’re joining to but ROWKEY
is a STRING
on the table (per the DESCRIBE
output above) by `CAST`ing the datatype on the left-hand side of the join:
ksql> SELECT C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER,
R.STARS,
R.MESSAGE
FROM RATINGS R
INNER JOIN CUSTOMERS2 C
ON CAST(R.USER_ID AS STRING) = C.ROWKEY
EMIT CHANGES;
+-----------------+-------+-----------------------------------------------------+
|CUSTOMER |STARS |MESSAGE |
+-----------------+-------+-----------------------------------------------------+
|Brena Tollerton |4 |your team here rocks! |
|Robinet Leheude |1 |more peanuts please |
|Josiah Brockett |4 |why is it so difficult to keep the bathrooms clean ? |
|Waldon Keddey |3 |Exceeded all my expectations. Thank you ! |
|Rica Blaisdell |1 |more peanuts please |
…
But my Single Message Transform doesn’t work… 🔗
With the Debezium connector and ExtractField$Key
SMT you might hit this error when you run the connector:
java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
This is detailed here, but in short you need to make sure that you’ve set in the Debezium connector config:
"include.schema.changes": "false",