Я использую коннектор приемника JDBC и у меня плохое сообщение в теме.Я знаю, почему сообщение плохое (оно терпит неудачу из-за нарушения ограничения FK из-за проблемы с производителем).Ошибка, сообщаемая рабочей задачей:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\n
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\nCaused by: java.sql.SQLException: java.sql.BatchUpdateException:
Cannot add or update a child row: a foreign key constraint fails
(`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor`
(`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolation
Exception: Cannot add or update a child row: a foreign key constraint
fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY
(`sensorId`) REFERENCES `sensor` (`id`))
Я хочу, чтобы это плохое сообщение было пропущено.Итак, я попытался установить "errors.tolerance": "all"
.Полная конфигурация для коннектора приемника выглядит следующим образом:
{
"name": "reading-sink2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 4,
"topics": "READING_MYSQL",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3306/sensorium?user=app&password=tQpRMCzHlAeu6kQIBk4U",
"auto.create": true,
"table.name.format": "reading",
"errors.tolerance": "all"
}
}
Но эта же ошибка регистрируется, сообщение не пропускается, а последующие сообщения не обрабатываются.
Почемуerrors.tolerance: all
не работает должным образом?