Обработка ошибок для недопустимого JSON в коннекторе раковины kafka - PullRequest
0 голосов
/ 11 февраля 2020

У меня есть приемник для mongodb, который берет json из топи c и помещает его в коллекцию mongoDB. Но когда я отправляю недопустимую JSON от производителя на эту топи c (например, с недопустимым специальным символом ") => {"id":1,"name":"\"}, соединитель останавливается. Я попытался использовать errors.tolerance = all, но то же самое происходит то, что соединитель должен пропускать и регистрировать недействительный JSON, и поддерживать соединитель в рабочем состоянии. Мой соединитель в распределенном режиме выглядит следующим образом:

{
  "name": "sink-mongonew_test1",  
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "error7",
    "connection.uri": "mongodb://****:27017", 
    "database": "abcd", 
    "collection": "abc",
    "type.name": "kafka-connect",
    "key.ignore": "true",

    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
    "value.projection.list": "id",
    "value.projection.type": "whitelist",
    "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",

    "delete.on.null.values": "false",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.deadletterqueue.topic.name": "crm_data_deadletterqueue",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}

1 Ответ

0 голосов
/ 12 февраля 2020

Начиная с Apache Kafka 2.0, Kafka Connect включает опции обработки ошибок , в том числе функциональность для маршрутизации сообщений в очередь недоставленных сообщений, распространенную технику построения конвейеров данных.

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

Как прокомментировано, вы используете connect-api-1.0.1.*.jar, версия 1.0.1, поэтому это объясняет, почему эти свойства не работают

Ваши альтернативы помимо запуска более новой версии Kafka Connect включают Nifi или Spark Structured Streaming

...