One of my favourite hacks for getting data into Kafka is using kafkacat and stdin
, often from jq
. You can see this in action with Wi-Fi data, IoT data, and data from a REST endpoint. This is fine for getting values into a Kafka message - but Kafka messages are key/value, and being able to specify a key is can often be important.
Here’s a way to do that, using a separator and some jq
magic. Note that at the moment kafkacat only supports single byte separator characters, so you need to choose carefully. If you pick a separator that also appears in your data, it’s possibly going to have unintended consequences.
Here’s a simple payload
[{
"orderId": "X94",
"orderTotal": "42",
"productName": "ACME yak shaver"
},
{
"orderId": "X95",
"orderTotal": "2",
"productName": "ACME TNT"
}]
We want to stream this onto a Kafka topic as one message per object in the array, rather than multiple lines of input per object (or one single long line of the entire array). Let’s break this down and see how to do that. We’ll start by running it through jq
with the --compact-output
flag (or -c
) to put each object on a single line, and a jq filter of []
to explode the array:
echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output '.[]'
{"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
{"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}
and now we can pipe it to Kafka:
echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output '.[]' | \
kafkacat -b localhost:9092 -t orders01 -P
Consuming it back we can see this has worked — but that the keys are currently null:
kafkacat -b localhost:9092 \
-t orders01 \
-C \
-f 'Key: %k, payload: %s\n'
Key: , payload: {"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
Key: , payload: {"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}
You may be fine with a null key - but often you will want one, whether to ensure that data for a particular instance of an entity always routes to the same partition, or just because it’s useful data to have in the message key for when it comes to process it (e.g. with ksqlDB).
So let’s set the key now. We have two key (!) ingredients to the method:
-
We’re going to set the
-K
parameter in kafkacat to declare the key/value delineator. This can be a straightforward printable character (such as:
), but what if your key value includes that character? Mayhem would ensue. Instead we can use a character that we would hope not to find in our actual key value - just make sure that it’s a single byte (so fancy stuff like§
, and emojis are out ☠️ 😿 ☠️ ).To specify a non-printable character you can use the bash syntax of
$'\x00'
to specify the hex value of the character - in this case00
, which is a NULL. I’ve chosen to use\x1c
in the example below. -
We’re going to use the same character in the
jq
filter and getjq
to concatenate it with the field that we want to use as the key and prefix it to the full payload value that we had originally. To pass in the character value without quote mark and escaping hell we can set it as a variable at invocation using thearg
parameter.-
We also set the
--raw-output
flag so that the string output doesn’t get written as a JSON string by jq - this is important for it to work as the subsequentstdin
-
Our jq
invocation now looks like this:
echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output \
--raw-output \
--arg sep $'\x1c' \
'.[] | [.orderId + $sep, tostring] | join("")'
X94{"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
X95{"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}
The outpuut shows out key value has been correctly prepended - but where’s our separator? That’s the joy of non-printable characters :) We can run it through hexdump
to prove that it is there:
echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output \
--raw-output \
--arg sep $'\x1c' \
'.[] | [.orderId + $sep, tostring] | join("")' | \
hexdump -C
There's the separator character |
||------------
||
VV
00000000 58 39 34 1c 7b 22 6f 72 64 65 72 49 64 22 3a 22 |X94.{"orderId":"|
00000010 58 39 34 22 2c 22 6f 72 64 65 72 54 6f 74 61 6c |X94","orderTotal|
00000020 22 3a 22 34 32 22 2c 22 70 72 6f 64 75 63 74 4e |":"42","productN|
00000030 61 6d 65 22 3a 22 41 43 4d 45 20 79 61 6b 20 73 |ame":"ACME yak s|
00000040 68 61 76 65 72 22 7d 0a 58 39 35 1c 7b 22 6f 72 |haver"}.X95.{"or|
00000050 64 65 72 49 64 22 3a 22 58 39 35 22 2c 22 6f 72 |derId":"X95","or|
00000060 64 65 72 54 6f 74 61 6c 22 3a 22 32 22 2c 22 70 |derTotal":"2","p|
00000070 72 6f 64 75 63 74 4e 61 6d 65 22 3a 22 41 43 4d |roductName":"ACM|
00000080 45 20 54 4e 54 22 7d 0a |E TNT"}.|
00000088
So, let’s hook this up to kafkacat:
echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output \
--raw-output \
--arg sep $'\x1c' \
'.[] | [.orderId + $sep, tostring] | join("")' | \
kafkacat -b localhost:9092 -t orders02 -K$'\x1c' -P
Let’s see what the data now looks like on the topic:
kafkacat -b localhost:9092 \
-t orders02 \
-C \
-f 'Key: %k, payload: %s\n'
Key: X94, payload: {"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
Key: X95, payload: {"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}
We can double check with ksqlDB too — there’s our keys:
ksql> PRINT orders02 FROM BEGINNING LIMIT 2;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/09/30 20:35:00.646 Z, key: X94, value: {"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
rowtime: 2020/09/30 20:35:00.646 Z, key: X95, value: {"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}
Footnote - what is that jq
filter doing? 🔗
Kinda unintelligible, right?
'.[] | [.orderId + $sep, tostring] | join("")'
Let’s check it out.
This is the actual filter that we want to use with the data.
We're using [] to explode the array. If you want a noop then just
use . on its own
|
| Now we pipe it to the next section
| |
| | |- This forces the object from the
| | | previous section to a string
V V V
'.[] | [.orderId + $sep, tostring] | join("")'
^ ^ ^
| |-------- |--- Joins the array that the [ ] created
This is the field that | so that the output is on a single line
we want to use as the This is the separator
message key character variable,
set in the --arg paramter