У меня есть приемник для mongodb, который берет json из топи c и помещает его в коллекцию mongoDB. Но когда я отправляю недопустимую JSON от производителя на эту топи c (например, с недопустимым специальным символом ") => {"id":1,"name":"\"}
, соединитель останавливается. Я попытался использовать errors.tolerance = all, но то же самое происходит то, что соединитель должен пропускать и регистрировать недействительный JSON, и поддерживать соединитель в рабочем состоянии. Мой соединитель в распределенном режиме выглядит следующим образом:
{
"name": "sink-mongonew_test1",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "error7",
"connection.uri": "mongodb://****:27017",
"database": "abcd",
"collection": "abc",
"type.name": "kafka-connect",
"key.ignore": "true",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"value.projection.list": "id",
"value.projection.type": "whitelist",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
"delete.on.null.values": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "crm_data_deadletterqueue",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true"
}
}