I use the Elastic stack for a lot of my talks and demos because it complements Kafka brilliantly. A few things have changed in recent releases and this blog is a quick note on some of the errors that you might hit and how to resolve them. It was inspired by a lot of the comments and discussion here and here.
tl;dr How do I stream data from Kafka to Elasticsearch? 🔗
Use Kafka Connect! Not sure what Kafka Connect is or why you should use it instead of something like Logstash? Check out the talk I did at Kafka Summit in London earlier this year.
Kafka Connect’s Elasticsearch sink connector has been improved in 5.3.1 to fully support Elasticsearch 7.
To stream data from a Kafka topic to Elasticsearch create a connector using the Kafka Connect REST API. The parameters vary slightly between releases of Elasticsearch.
Some notes in general:
-
This assumes your data is serialised based on the defaults specified in your Kafka Connect workers (e.g. Avro). If it’s not then you will need to add overrides—see this article for a detailed explanation of why and how.
-
If you’re streaming data to Elasticsearch from KSQL you will need to set the Key converter to STRING since this is currently (October 2019 / 5.4.0-beta1) all that is supported for keys:
"key.converter":"org.apache.kafka.connect.storage.StringConverter"
-
The connector will automagically change upper-case topic names to lower-case index names in Elasticsearch; unlike in previous versions you don’t need to manually map this.
-
You can use a regex to match multiple topics; just specify
topics.regex
in place oftopics
configuration.
For the full reference guide to the Kafka Connect Elasticsearch connector, including all its capabilities (including exactly-once) and configuration options see here.
To schema or not to schema? 🔗
-
schema.ignore=true
: You want to just chuck a JSON document at Elasticsearch and have it figure out the field mapping types automagically (which it does pretty well, or you can force using dynamic mapping templates). Also applicable if you don’t have an explicit schema in your data such as schema-less JSON (most JSON is schema-less) or CSV etc. If you are using Avro, you have a schema 🙌. -
schema.ignore=false
: Your data has a schema (Avro, or JSON with embedded schema) and you want Kafka Connect to create the mapping explicitly in Elasticsearch when it pushes the data over
Still not sure? Just wanna get data into Elasticsearch without really getting into the weeds of detail? Start off with schema.ignore=true
.
Kafka to Elasticsearch 7 🔗
This works with Kafka Connect Elasticsearch sink connector >=5.3.0
curl -s -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-01/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch7:9200",
"type.name": "_doc",
"topics": "sample_topic",
"key.ignore": "true",
"schema.ignore": "true"
}'
Note
|
The type.name is _doc - other values may cause problems in some configuration permutations. You can also leave it blank in some situations.
|
Kafka to Elasticsearch 6 and earlier 🔗
The only difference from above is that you can specify any type name. Unless you’re using a specific type in your target index by design then you can use any value here; but you can’t leave it blank.
curl -s -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-01/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch6:9200",
"type.name": "foobarwibble",
"topics": "sample_topic",
"key.ignore": "true",
"schema.ignore": "true"
}'
Templates in Elasticsearch 7 🔗
Sometime you’ll want to use templates with Elasticsearch for things such as defining the field types to be used in the document mapping. This has changed a bit in recent versions and caught me out.
If you copy and paste template definitions that you’ve found lying around on t’internet such as this one:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
"template": "*",
"settings": { "number_of_shards": 1, "number_of_replicas": 0 },
"mappings": { "_default_": { "dynamic_templates": [ { "dates": { "match": "*_TS", "mapping": { "type": "date" } } } ] } }
}'
You’ll now get this error, which is deliberate:
{
"error": {
"root_cause": [
{
"type": "mapper_parsing_exception",
"reason": "Root mapping definition has unsupported parameters: [_default_ : {dynamic_templates=[{dates={mapping={type=date}, match=*_TS}}]}]"
}
],
"type": "mapper_parsing_exception",
"reason": "Failed to parse mapping [_doc]: Root mapping definition has unsupported parameters: [_default_ : {dynamic_templates=[{dates={mapping={type=date}, match=*_TS}}]}]",
"caused_by": {
"type": "mapper_parsing_exception",
"reason": "Root mapping definition has unsupported parameters: [_default_ : {dynamic_templates=[{dates={mapping={type=date}, match=*_TS}}]}]"
}
},
"status": 400
}
To get this to work just remove the type name (_default_
) from the mappings
element entirely:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
"template": "*",
"settings": { "number_of_shards": 1, "number_of_replicas": 0 },
"mappings": { "dynamic_templates": [ { "dates": { "match": "*_TS", "mapping": { "type": "date" } } } ] }
}'
HOWEVER this only works for Elasticsearch 7; on Elasticsearch 6 and earlier you will get Malformed [mappings] section for type [dynamic_templates], should include an inner object describing the mapping"}]
.
If you’re using a template with Elasticsearch 7 then you must specify "type.name": "_doc"
in your connector configuration. A blank or other value will cause the connector to fail.
Dealing with errors 🔗
Both the Elasticsearch sink connector, and Kafka Connect itself, have error handling support. See this article for details of how Kafka Connect does it. By default the connector will abort as soon as it hits a problem, but you may not want this—to enable it in your connector you can set:
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"behavior.on.malformed.documents": "warn"
This is the most permissive configuration; behavior.on.malformed.documents
is a connector property which when set to warn
(or ignore
) will make the connector continue rather than abort, which is it’s default setting. The remaining configuration items are for Kafka Connect itself and will deal with errors in the deserialisation process and any Single Message Transforms that have been configured.
Common errors 🔗
Field … is a metadata field 🔗
-
Error
org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"Field [_type] is a metadata field and cannot be added inside a document. Use the index API request parameters."}]
-
Cause: You’ve got a field called
_type
in your Kafka message that you’re sending to Elasticsearch -
Solution: Drop or rename the field e.g. with Single Message Transform or at source
Rejecting mapping update […] as the final mapping would have more than 1 type 🔗
-
Error:
WARN Encountered an illegal document error when executing batch 4 of 1 records. Ignoring and will not index record. Error was [{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [sample_topic] as the final mapping would have more than 1 type: [_doc, foo]"}] (io.confluent.connect.elasticsearch.bulk.BulkProcessor)`
-
Cause 1: Elasticsearch index already exists with a different type in the mapping
-
Cause 2: Template with dynamic mapping exists and
type.name
has been specified -
Solution: Unset
type.name
(i.e.`"type.name": ""
), or use the type that already exists (in the above example it’s_doc
).
Validation Failed […] type is missing 🔗
-
Error:
org.apache.kafka.connect.errors.ConnectException: Bulk request failed: {"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: type is missing;2: type is missing;3: type is missing;4: type is missing;5: type is missing;"}],"type":"action_request_validation_exception","reason":"Validation Failed: 1: type is missing;2: type is missing;3: type is missing;4: type is missing;5: type is missing;"}
-
Cause 1: Using a blank
type.name
in the Kafka Connect connector configuration when indexing against Elasticsearch 7 withschema.ignore=false
-
Cause 2: Using a blank
type.name
in the Kafka Connect connector configuration when indexing against Elasticsearch versions prior to 7 -
Solution: Specify a non-blank
type.name
in the Kafka Connect connector configuration
Task is being killed and will not recover until manually restarted 🔗
-
Error:
Task threw an uncaught and unrecoverable exception org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler Task is being killed and will not recover until manually restarted
-
Cause: This is the Kafka Connect framework logging that a connector has failed
-
Solution: Inspect the Kafka Connect worker log more closely to find the actual error logged by the connector task
java.io.CharConversionException: Invalid UTF-32 character 🔗
-
Error:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1010443 (above 0x0010ffff) at char # 1, byte #7) java.io.CharConversionException: Invalid UTF-32 character 0x1010443 (above 0x0010ffff) at char #1, byte #7)
-
Cause: Using the JSON converter (
org.apache.kafka.connect.json.JsonConverter
) to read Avro data -
Solution: Use the Avro converter (
io.confluent.connect.avro.AvroConverter
)
Note
|
Kafka Connect has two deserialisers: the key and the value. It is not uncommon to have different serialisation formats used for each. For example, data from KSQL may have a String key and an Avro key. |
Error deserializing Avro message for id -1 Unknown magic byte! 🔗
-
Error:
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic sample_topic to Avro: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
-
Cause: Using the Avro converter (
io.confluent.connect.avro.AvroConverter
) to read JSON data -
Solution: Use the JSON converter (
org.apache.kafka.connect.json.JsonConverter
)
Note
|
Kafka Connect has two deserialisers: the key and the value. It is not uncommon to have different serialisation formats used for each. For example, data from KSQL may have a String key and an Avro key. |
Cannot infer mapping without schema 🔗
-
Error:
org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
-
Cause: If you have set
schema.ignore=false
then the connector will create the mapping in the target index for you, based on the schema of your data. BUT, for it to obtain the schema, there has to be a schema! Which means either using Avro, or using JSON with the schema-embedded and the connector’s converter configured to expect it. -
Solution: Use Avro! It will save you tears and time and money. If you can’t change how you produce the data, consider using KSQL to reserialise the topic into Avro. Or, write JSON in the required structure and set
value.converter.schemas.enable=true
.
JsonConverter with schemas.enable requires "schema" and "payload" fields 🔗
-
Error:
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
-
Cause: You’ve set
schemas.enable=true
for your converter, but the JSON is not in the correct structure. See here for details. -
Solution: Depending on what you’re trying to do either (a) use Avro, (b) produce your JSON with the schema/payload in the correct structure (c) set
value.converter.schemas.enable=false
(if you don’t care about the schema and want to setschema.ignore=true
for the Elasticsearch connector).
Note
|
schemas.enable is a converter configuration, so can be set for both value.converter and key.converter , and you can hit this error against both fields.
|
Compressor detection can only be called on some xcontent bytes 🔗
-
Error:
Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}] (io.confluent.connect.elasticsearch.bulk.BulkProcessor:393)
-
Cause: This can come about if you try to read JSON data from a topic using the String converter (
org.apache.kafka.connect.storage.StringConverter
) and have"schema.ignore": "true"
, because you end up with a single field of data. This in turn causes Elasticsearch to throw this error when Kafka Connect tries to index the data into it. -
Solution: If it’s JSON data in the topic, use the
org.apache.kafka.connect.json.JsonConverter
, i.e."value.converter":"org.apache.kafka.connect.json.JsonConverter"
Root mapping definition has unsupported parameters: [type : text] 🔗
-
Error:
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"_doc":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}} -- {"root_cause":[{"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters: [type : text] [fields : {keyword={ignore_above=256, type=keyword}}]"}],"type":"mapper_parsing_exception","reason":"Root mapping definition has unsupported parameters: [type : text] [fields : {keyword={ignore_above=256, type=keyword}}]"}
-
Cause: This is an error from Elasticsearch and could be from various reasons. One is if you try to read JSON data from a topic using the String converter (
org.apache.kafka.connect.storage.StringConverter
) and have"schema.ignore": "false"
, because you end up with a single field of data. This in turn causes Elasticsearch to throw this error when Kafka Connect tries to index the data into it. -
Solution: If it’s JSON data in the topic, use the
org.apache.kafka.connect.json.JsonConverter
, i.e."value.converter":"org.apache.kafka.connect.json.JsonConverter"
Want to try it out yourself? 🔗
You can find my test rig on github here.