As you may already realise, Kafka is not just a fancy message bus, or a pipe for big data. It’s an event streaming platform! If this is news to you, I’ll wait here whilst you read this or watch this…
One of the neat things that Kafka does with its messages is the concept of tombstone messages. These are messages with a null value. They’re usually used in conjunction with a key to indicate the logical deletion of a record. This could be within a Kafka topic itself in the case of compacted topics, or when used with Kafka Connect and sink connectors that support this semantic such as Elasticsearch or JDBC Sink.
Here I’m going to show you how you can use tombstone message with ksqlDB too. It’s not supported as a first-class concept yet, but it is possible with a bit of a Heath Robinson approach.
The scenario 🔗
You’ve got a source database in which a field indicates the logical deletion of a record. You want to make that a hard deletion when the data is streamed to Elasticsearch.
Setup 🔗
Source database 🔗
Create and populate the source table in Postgres:
CREATE TABLE orders (order_id INT,
order_total_usd DECIMAL(5,2),
item VARCHAR(50),
cancelled_ind BOOLEAN,
update_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
INSERT INTO orders VALUES (1,12.34,'Cake',false);
INSERT INTO orders VALUES (2,56.78,'More Cake',true);
INSERT INTO orders VALUES (3,910.11,'Franzbrötchen',false);
Data:
postgres=# SELECT * FROM orders;
order_id | order_total_usd | item | cancelled_ind | update_ts
----------+-----------------+---------------+---------------+----------------------------
1 | 12.34 | Cake | f | 2020-11-03 21:52:52.39298
2 | 56.78 | More Cake | t | 2020-11-03 21:52:52.396852
3 | 910.11 | Franzbrötchen | f | 2020-11-03 21:52:52.885457
(3 rows)
Create a trigger to update the timestamp on modification (so that the CDC can identify the changed row - learn more here)
-- Courtesy of https://techblog.covermymeds.com/databases/on-update-timestamps-mysql-vs-postgres/
CREATE FUNCTION public.update_updated_at_column() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
NEW.update_ts = NOW();
RETURN NEW;
END;
$$;
CREATE TRIGGER customers_updated_at_modtime BEFORE UPDATE ON orders FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
Test the trigger:
postgres=# update orders set item='Chocolate Cake' where order_id=1;
UPDATE 1
postgres=# SELECT * FROM orders;
order_id | order_total_usd | item | cancelled_ind | update_ts
----------+-----------------+----------------+---------------+----------------------------
2 | 56.78 | More Cake | t | 2020-11-03 21:52:52.396852
3 | 910.11 | Franzbrötchen | f | 2020-11-03 21:52:52.885457
1 | 12.34 | Chocolate Cake | f | 2020-11-03 21:53:49.680009
Ingest into Kafka 🔗
This uses the Kafka Connect JDBC Source connector. You can also use tools like Debezium for ingesting from RDBMS into Kafka. This talk covers the different tools and approaches available.
I’m using ksqlDB to create the connector, but it’s using Kafka Connect in the background, and you can use the Kafka Connect REST API directly if you’d rather.
ksql> CREATE SOURCE CONNECTOR SOURCE_PG01 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'poll.interval.ms' = 1000,
'mode' = 'timestamp',
'table.whitelist' = 'orders',
'topic.prefix' = '',
'timestamp.column.name' = 'update_ts',
'validate.non.null' = false,
'numeric.mapping' = 'best_fit',
'transforms' = 'copyFieldToKey,extractKeyFromStruct,removeKeyFromValue',
'transforms.copyFieldToKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.copyFieldToKey.fields' = 'order_id',
'transforms.extractKeyFromStruct.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractKeyFromStruct.field' = 'order_id',
'transforms.removeKeyFromValue.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.removeKeyFromValue.blacklist'= 'order_id',
'key.converter' = 'org.apache.kafka.connect.converters.IntegerConverter');
Message
-------------------------------
Created connector SOURCE_PG01
-------------------------------
See this explanation for the transforms , but it’s basically setting the order_id as the key of the message
|
ksql> SHOW CONNECTORS;
Connector Name | Type | Class | Status
-------------------------------------------------------------------------------------------------------
SOURCE_PG01 | SOURCE | io.confluent.connect.jdbc.JdbcSourceConnector | RUNNING (1/1 tasks RUNNING)
-------------------------------------------------------------------------------------------------------
ksql>
Check the data as it arrives - note that the key is the order_id
value, and it is not repeated in the value part of the message
ksql> PRINT 'orders' FROM BEGINNING;
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2020/11/03 21:59:23.628 Z, key: 2, value: {"order_total_usd": 56.78, "item": "More Cake", "cancelled_ind": true, "update_ts": 1604440372396}
rowtime: 2020/11/03 21:59:23.629 Z, key: 3, value: {"order_total_usd": 910.11, "item": "Franzbrötchen", "cancelled_ind": false, "update_ts": 1604440372885}
rowtime: 2020/11/03 21:59:23.629 Z, key: 1, value: {"order_total_usd": 12.34, "item": "Chocolate Cake", "cancelled_ind": false, "update_ts": 1604440429680}
Modelling the source data as a ksqlDB stream 🔗
CREATE STREAM ORDERS_SRC (ORDER_ID INT KEY) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM ORDERS_SRC EMIT CHANGES LIMIT 3;
+----------+----------------+----------------+---------------+---------------+
|ORDER_ID |ORDER_TOTAL_USD |ITEM |CANCELLED_IND |UPDATE_TS |
+----------+----------------+----------------+---------------+---------------+
|2 |56.78 |More Cake |true |1604440372396 |
|3 |910.11 |Franzbrötchen |false |1604440372885 |
|1 |12.34 |Chocolate Cake |false |1604440429680 |
Limit Reached
Query terminated
Processing the data 🔗
First we populate a new topic with messages from the source in which the order has not been logically deleted (WHERE CANCELLED_IND = FALSE
):
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> CREATE STREAM ORDERS_NOT_DELETED
WITH (KAFKA_TOPIC='orders_processed', VALUE_FORMAT='AVRO') AS
SELECT * FROM ORDERS_SRC
WHERE CANCELLED_IND = FALSE;
Examine the output topic and note that the logically-deleted order is not present:
ksql> PRINT orders_processed;
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2020/11/03 21:59:23.629 Z, key: 3, value: {"ORDER_TOTAL_USD": 910.11, "ITEM": "Franzbrötchen", "CANCELLED_IND": false, "UPDATE_TS": 1604440372885}
rowtime: 2020/11/03 21:59:23.629 Z, key: 1, value: {"ORDER_TOTAL_USD": 12.34, "ITEM": "Chocolate Cake", "CANCELLED_IND": false, "UPDATE_TS": 1604440429680}
Now we do the fiddly bit - write a null
for the value if the order has been logically deleted (WHERE CANCELLED_IND = TRUE
). Most importantly, we use the KAFKA
serialisation format (see below for an explanation why).
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> CREATE STREAM ORDERS_DELETED
WITH (KAFKA_TOPIC='orders_processed', VALUE_FORMAT='KAFKA') AS
SELECT ORDER_ID, CAST(NULL AS VARCHAR) FROM ORDERS_SRC
WHERE CANCELLED_IND = TRUE;
This bit might look a bit odd: CAST(NULL AS VARCHAR)
but is necessary since ksqlDB needs a datatype even if it’s gonna be NULL. Without it you might hit this error.
Now when we look at the topic we can see a tombstone message for order_id=2
:
ksql> PRINT orders_processed;
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2020/11/03 21:59:23.629 Z, key: 3, value: {"ORDER_TOTAL_USD": 910.11, "ITEM": "Franzbrötchen", "CANCELLED_IND": false, "UPDATE_TS": 1604440372885}
rowtime: 2020/11/03 21:59:23.629 Z, key: 1, value: {"ORDER_TOTAL_USD": 12.34, "ITEM": "Chocolate Cake", "CANCELLED_IND": false, "UPDATE_TS": 1604440429680}
rowtime: 2020/11/03 21:59:23.628 Z, key: 2, value: <null>
If you don’t quite believe me, we can double-check with kafkacat
:
$ docker exec kafkacat kafkacat -b broker:29092 -t orders_processed -J -C -u | jq '{key, payload}' 130 ↵
{
"key": "\u0000\u0000\u0000\u0003",
"payload": "\u0000\u0000\u0000\u0000\u0003\u0002{\u0014�G�p�@\u0002\u001cFranzbrötchen\u0002\u0000\u0002��Ɂ�]"
}
{
"key": "\u0000\u0000\u0000\u0001",
"payload": "\u0000\u0000\u0000\u0000\u0003\u0002�G�z\u0014�(@\u0002\u001cChocolate Cake\u0002\u0000\u0002��Ё�]"
}
{
"key": "\u0000\u0000\u0000\u0002",
"payload": null
}
Testing the deletes 🔗
In Postgres, logically delete an order (order_id=3
):
UPDATE orders SET cancelled_ind=TRUE WHERE order_id=3;
In ksqlDB the topic shown through PRINT
shows that there’s now a tombstone for this order:
rowtime: 2020/11/03 22:15:40.179 Z, key: 3, value: <null>
and this is confirmed by kafkacat (which isn’t surprising, since they’re consuming from the same topic)
{
"key": "\u0000\u0000\u0000\u0003",
"payload": null
}
Streaming to Elasticsearch 🔗
As with the source connector, I’m going to use ksqlDB to configure the connector, but you can use Kafka Connect directly if you’d rather. To learn more about streaming from Kafka to Elasticsearch see this tutorial and video.
So that the timestamp field is correctly mapped in Elasticsearch I create a dynamic template first:
curl -s -XPUT "http://localhost:9200/_template/rmoff/" -H 'Content-Type: application/json' -d'
{
"template": "*",
"mappings": { "dynamic_templates": [ { "dates": { "match": "*_TS", "mapping": { "type": "date" } } } ] }
}'
Now create the sink connector:
ksql> CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'orders_processed',
'key.converter' = 'org.apache.kafka.connect.converters.IntegerConverter',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'behavior.on.null.values' = 'delete',
'transforms' = 'setTimestampType0',
'transforms.setTimestampType0.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType0.field' = 'UPDATE_TS',
'transforms.setTimestampType0.target.type' = 'Timestamp'
);
-
behavior.on.null.values
defaults toignore
, so make sure to set it todelete
if that’s what you want it to do -
To use the message key for indexing and identifying the document to delete, you need to set
key.ignore=false
ksql> SHOW CONNECTORS;
Connector Name | Type | Class | Status
------------------------------------------------------------------------------------------------------------------------
SOURCE_PG01 | SOURCE | io.confluent.connect.jdbc.JdbcSourceConnector | RUNNING (1/1 tasks RUNNING)
SINK_ELASTIC_01 | SINK | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector | RUNNING (1/1 tasks RUNNING)
------------------------------------------------------------------------------------------------------------------------
Testing end-to-end 🔗
Current database state:
postgres=# SELECT * FROM orders ORDER BY cancelled_ind;
order_id | order_total_usd | item | cancelled_ind | update_ts
----------+-----------------+----------------+---------------+----------------------------
1 | 12.34 | Chocolate Cake | f | 2020-11-03 22:13:34.981701
2 | 56.78 | More Cake | t | 2020-11-03 22:12:57.781796
3 | 910.11 | Franzbrötchen | t | 2020-11-03 22:15:39.808105
(3 rows)
Current Elasticsearch state:
$ curl -s http://localhost:9200/orders_processed/_search \
-H 'content-type: application/json' | jq '.hits.hits'
[
{
"_index": "orders_processed",
"_type": "_doc",
"_id": "1",
"_score": 1,
"_source": {
"ORDER_TOTAL_USD": 12.34,
"ITEM": "Chocolate Cake",
"CANCELLED_IND": false,
"UPDATE_TS": 1604441614981
}
}
]
Add a new order to the database:
INSERT INTO orders VALUES (4,12.13,'Parkin',false);
This appears in Elasticsearch:
$ curl -s http://localhost:9200/orders_processed/_search \
-H 'content-type: application/json' | jq '.hits.hits'
[
{
"_index": "orders_processed",
"_type": "_doc",
"_id": "1",
"_score": 1,
"_source": {
"ORDER_TOTAL_USD": 12.34,
"ITEM": "Chocolate Cake",
"CANCELLED_IND": false,
"UPDATE_TS": 1604441614981
}
},
{
"_index": "orders_processed",
"_type": "_doc",
"_id": "4",
"_score": 1,
"_source": {
"ORDER_TOTAL_USD": 12.13,
"ITEM": "Parkin",
"CANCELLED_IND": false,
"UPDATE_TS": 1604443348508
}
}
]
Mark order_id=1
as logically deleted in the database:
UPDATE orders SET cancelled_ind=TRUE WHERE order_id=1;
Document no longer exists in Elasticsearch:
$ curl -s http://localhost:9200/orders_processed/_search \
-H 'content-type: application/json' | jq '.hits.hits'
[
{
"_index": "orders_processed",
"_type": "_doc",
"_id": "4",
"_score": 1,
"_source": {
"ORDER_TOTAL_USD": 12.13,
"ITEM": "Parkin",
"CANCELLED_IND": false,
"UPDATE_TS": 1604443348508
}
}
]
Footnotes 🔗
Compacted topics 🔗
As mentioned at the begining, tombstones and compacted topics are often (but not always) two sides of the same coin. A compacted topic retains the latest value for every key, and so is a perfect way to retain state with which you might want to rehydrate a target datastore with. To configure the topic to be compacted run
$ docker exec broker kafka-configs --alter \
--bootstrap-server broker:29092 \
--entity-type topics \
--entity-name orders_processed \
--add-config cleanup.policy=compact
Completed updating config for topic orders_processed.
Note that log compaction does not run straight away, and there are other factors involved including things like active segments that will affect when you will actually be able to observe log compaction taking place.
Why does the VALUE_FORMAT=KAFKA
trick work? 🔗
When ksqlDB writes to a target stream (and thus an underlying topic) it has to serialise the data, and this is controlled by the VALUE_FORMAT
either explicitly stated in the DDL or inherited from the source stream. The serialisation will usually be something like Avro or Protobuf (which support schemas and thus the Schema Registry), or JSON (which doesn’t). Either way, it’ll write a payload _that includes the schema, since a ksqlDB stream is a Kafka topic plus schema. Here’s what it’d look like writing out NULL
field with VALUE_FORMAT='JSON'
:
{
"offset": 5,
"key": "\u0000\u0000\u0000\u0001",
"payload": "{\"KSQL_COL_0\":null}"
}
By using the KAFKA
format the value is written as a primitive, with no envelope or schema. You can read more about serialisation formats here
Try it out! 🔗
Grab the Docker Compose file from here, run docker-compose up -d
and try this whole article out for yourself!