ОШИБКА Обнаружена недопустимая ошибка документа при выполнении пакета 7 из 1 записей Kafka Connect for Elastci Search - PullRequest
0 голосов
/ 04 февраля 2020

Итак, сначала я перечисляю версию этой ошибки. Elasti c search 6.0 Kafka connect 5.4.0

У меня есть connect-distributed.properties

bootstrap.servers=b-***.eu-west-1.amazonaws.com:9092,b-***.eu-west-1.amazonaws.com:9092,b-***.eu-west-1.amazonaws.com:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java

Мой завиток для создания соединителя

sudo curl  -XPOST -H "Content-Type: application/json" --data '{"name":"elastic-search-sink-audit-distributed","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"2","topics":"audit_event_distributed_3","key.ignore":"true","connection.url":"https://***.us-east-1.es.amazonaws.com","type.name":"kafka-connect_distributed","name":"elastic-search-sink-audit-distributed","key.ignore": "true","schema.ignore": "true","errors.tolerance":"all","errors.deadletterqueue.topic.name":"dlq_distributed"}}' http://localhost:8083/connectors | jq

Как вы видите, я упомянул

"schema.ignore": "true"
"errors.tolerance":"all"

И это мой пример сообщения

{ "ID":"avro2-9749-0e710000fd04", "VERSION":"1", "ACTION_TYPE":"NEW_CASE", "EVENT_TYPE":"WORLDCHECK", "CLIENT_ID":"fgh-5d1e-17a2-9749-0e4d00", "DETAILS":"<?xml version=\"1.0\" <gender>MALE</gender></caseCreatedPayload>", "OBJECT_TYPE":"CASE", "UTC_DATE_TIME":"1578469623000", "POINT_IN_TIME_PRECISION":"TIME", "TIME_ZONE":"UTC", "TIMELINE_PRECISION":"ON", "GROUP_ID":"0a348753-5d1e-17a2-9749-0e4d0000146d", "OBJECT_DISPLAY_NAME":"NULL", "OBJECT_ID":"0a348753-5d1e-17af-9749-0e7100006ccf", "USER_DISPLAY_NAME":"USER_FIRST_6cb4c322-cd3d-4809-97d3-07d2d96f10ed", "USER_ID":"USER_LAST_7e99cad9-dc1c-4770-ac4f-33c4897ce404", "PARENT_EVENT_ID":"0a348752-5d17-138e-9749-0e6a00000c7f", "NOTES":"null", "SUMMARY":"sumary", "AUDIT_EVENT_TO_UTC_DT":"1578469621000", "AUDIT_EVENT_TO_DATE_PITP":"null", "AUDIT_EVENT_TO_DATE_TZ":"null", "AUDIT_EVENT_TO_DATE_TP":"null"}

Наконец, когда я положить что-то в topi c я получаю ниже ошибка

[2020-02-04 08:32:16,629] INFO [Consumer clientId=consumer-13, groupId=connect-elastic-search-sink-audit-distributed] Resetting offset for partition audit_event_distributed_3-44 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2020-02-04 08:32:16,650] ERROR Encountered an illegal document error when executing batch 6 of 1 records. Error was [{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}] (to ignore future records like this change the configuration property 'behavior.on.malformed.documents' from 'fail' to 'ignore'). (io.confluent.connect.elasticsearch.bulk.BulkProcessor:421)
[2020-02-04 08:32:16,650] WARN Failed to execute batch 6 of 1 records with attempt 1/6, will attempt retry after 16 ms. Failure reason: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}] (io.confluent.connect.elasticsearch.bulk.BulkProcessor:393)

Я здесь пропускаю какие-либо свойства?

1 Ответ

0 голосов
/ 04 февраля 2020

Попробуйте изменить value.converter свойство:

value.converter = org. apache .kafka.connect. json .JsonConverter

...