Как игнорировать результат ошибки в Kafka Connect Elasticsearch - PullRequest
0 голосов
/ 14 января 2020

Я пытаюсь запустить kafka connect для поиска elasti c. Но из-за какой-то ошибки я ввел неправильную запись в kafka topi c.

Теперь я исправил эту проблему и вставил правильное значение, но elasti c поиск все еще вызывает ошибку в предыдущей записи в topi c

Вот ошибка

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lambdaDemo0': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"lambdaDemo0-9749-0e710000fd04"; line: 1, column: 13]

Можно ли как-нибудь проигнорировать старую запись в топи c и сказать kafka connect, чтобы она выбрала последнюю запись? Я пытаюсь удалить topi c, я получаю topi c, помеченный для удаления, но все еще есть записи в topi c.

Я пробовал ниже два свойства, но, кажется, работает

drop.invalid.message=true
behavior.on.malformed.documents=ignore

Подскажите, пожалуйста, как я могу очистить неправильную запись в топи c

1 Ответ

1 голос
/ 14 января 2020

Вы можете указать Kafka Connect просто пропускать плохие записи

errors.tolerance = all

При желании вы можете направить эти сообщения в другую топи c (известную как очередь недоставленных сообщений) для проверки, добавив

errors.tolerance = all
errors.deadletterqueue.topic.name = my-dlq-topic

Эти настройки действительны для Kafka Connect с любым соединителем, который не работает на этапе сериализации / десериализации обработки. Для получения дополнительной информации см эта статья .

...