The Flatten
Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.
To use the Single Message Transform you only need to reference it; thereโs no additional configuration required:
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value"
You can optionally override the default delimiter (.
) thatโs used:
"transforms.flatten.delimiter" : "_"
Example - JDBC Sink connector ๐
See also ๐ฅ Kafka Connect in Action : JDBC Sink (๐พ demo code
) and ๐ฅ ksqlDB & Kafka Connect JDBC Sink in action (๐พ demo code
)
Given a source message that looks like this:
{
"FULL_NAME": "Opossum, american virginia",
"ADDRESS": {
"STREET": "20 Acker Terrace"
"CITY": "Lynchburg"
"COUNTY_OR_STATE": "Virginia"
"ZIP_OR_POSTCODE": "24515"
}
}
We canโt load it directly into a database because databases expect flat structures. If we try to load it as it is the JDBC Sink connector will fail and throw an error:
โฆ(STRUCT) type doesn't have a mapping to the SQL database column type
So we use the Single Message Transform to flatten the source payload:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day3-customers",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter" : "_"
}'
This will work, and you can now see the data in MySQL:
mysql> describe `day3-customers`;
+-------------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------------+------+------+-----+---------+-------+
| FULL_NAME | text | YES | | NULL | |
| ADDRESS_STREET | text | YES | | NULL | |
| ADDRESS_CITY | text | YES | | NULL | |
| ADDRESS_COUNTY_OR_STATE | text | YES | | NULL | |
| ADDRESS_ZIP_OR_POSTCODE | text | YES | | NULL | |
+-------------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select * from `day3-customers`;
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| FULL_NAME | ADDRESS_STREET | ADDRESS_CITY | ADDRESS_COUNTY_OR_STATE | ADDRESS_ZIP_OR_POSTCODE |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| Opossum, american virginia | 20 Acker Terrace | Lynchburg | Virginia | 24515 |
| Red deer | 53 Basil Terrace | Lexington | Kentucky | 40515 |
| Laughing kookaburra | 84 Monument Alley | San Jose | California | 95113 |
| American bighorn sheep | 326 Sauthoff Crossing | San Antonio | Texas | 78296 |
| Skua, long-tailed | 7 Laurel Terrace | Manassas | Virginia | 22111 |
| Fox, bat-eared | 2946 Daystar Drive | Jamaica | New York | 11431 |
| Greater rhea | 97 Morning Way | Charleston | West Virginia | 25331 |
| Vervet monkey | 7615 Brown Park | Chicago | Illinois | 60681 |
| White spoonbill | 7 Fulton Parkway | Asheville | North Carolina | 28805 |
| Sun gazer | 61 Lakewood Gardens Parkway | Pensacola | Florida | 32590 |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
10 rows in set (0.00 sec)
Hereโs how to add the key into the target table:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day3-customers2",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter" : "_",
"pk.mode" : "record_key",
"pk.fields" : "id",
"key.converter" : "org.apache.kafka.connect.converters.LongConverter"
}'