The Cast
Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
Cast
takes one argument listing the field(s) that you would like to transform and the target data type. Multiple fields can be specified by separating each specification with a comma.
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
Changing the data type of fields as they pass through Kafka Connect 🔗
Because we’re developers who appreciate the finer things in life, including the importance of schemas, we’re using a suitable serialisation method for our data - Avro, Protobuf, or JSON Schema. The beauty of this is that the full schema is persisted at ingest, and available for any consumer. It also means that we want to make sure that the schema is correct for the data.
Let’s use the REST API of the Schema Registry to take a look at the schema of the data that our source connector is streaming into Kafka:
$ curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'
"fields": [
{
"name": "cost",
"type": [ "null", "string" ],
},
{
"name": "customer_remarks",
"type": [ "null", "string" ],
},
{
"name": "units",
"type": [ "null", "string" ],
},
{
"name": "card_type",
"type": [ "null", "string" ],
},
{
"name": "txn_date",
"type": [ "null", "string" ],
},
{
"name": "item",
"type": [ "null", "string" ],
}
]
(I’ve cut bits of the schema shown above for the sake of space)
This data streamed as-is to a target system will—unless the source system has something like Elasticsearch’s dynamic field mapping—remain in its string
/text
types:
mysql> describe `day9-transactions`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost | text | YES | | NULL | |
| customer_remarks | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | text | YES | | NULL | |
| item | text | YES | | NULL | |
| units | text | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
6 rows in set (0.01 sec)
We saw previously how we handled the txn_date
being a string when it should be a timestamp. Now we will use the Cast
Single Message Transform to cast the two fields currently held as string to their correct numeric types.
We can do this at ingest or egress; if the conversion is to rectify an incorrect type then logically this should be done at ingest in the source connector. If it’s to fix a specific requirement in a technology being written to in a sink connector then it should be done there instead.
Here we’ll apply the fix to the source connector. Note that we’re using a new target topic (day9-01
) because otherwise you’ll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409
).
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day9-01-transactions.with" : "#{Internet.uuid}",
"genv.day9-01-transactions.cost.with" : "#{Commerce.price}",
"genv.day9-01-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day9-01-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day9-01-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day9-01-transactions.item.with" : "#{Beer.name}",
"topic.day9-01-transactions.throttle.ms" : 1000,
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
}'
Now the schema of the data in Kafka is correct for the data:
"fields": [
{
"name": "txn_date",
"type": [ "null", "string" ],
},
{
"name": "units",
"type": [
"null", { "type": "int", "connect.type": "int16" } ],
},
{
"name": "customer_remarks",
"type": [ "null", "string" ],
},
{
"name": "cost",
"type": [ "null", "float" ],
},
{
"name": "item",
"type": [ "null", "string"
],
},
{
"name": "card_type",
"type": [ "null", "string" ],
}
]
and when it’s used in a sink connector the data is correctly stored in the target system:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day9-01-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"}'
mysql> describe `day9-01-transactions`;
+------------------+----------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+----------+------+-----+---------+-------+
| txn_date | text | YES | | NULL | |
| units | smallint | YES | | NULL | |
| customer_remarks | text | YES | | NULL | |
| cost | float | YES | | NULL | |
| item | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
+------------------+----------+------+-----+---------+-------+
6 rows in set (0.00 sec)
mysql> select item, units, cost from `day9-01-transactions` LIMIT 5;
+-----------------------+-------+-------+
| item | units | cost |
+-----------------------+-------+-------+
| Alpha King Pale Ale | 29 | 17.49 |
| Brooklyn Black | 36 | 92.88 |
| St. Bernardus Abt 12 | 17 | 94.04 |
| Celebrator Doppelbock | 63 | 58.64 |
| Ten FIDY | 85 | 60.53 |
+-----------------------+-------+-------+
5 rows in set (0.00 sec)