Я использую JdbcSinkConnector с postgres.
Конфиг:
{
"name":"jdbc",
"config":{
"topics":"some_topic",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://ip/sometable?user=postgresadmin&password=pass",
"auto.create": true,
"table.name.format": "sometable",
"pk.mode": "none",
"insert.mode": "insert",
"tasks.max":"1",
"transforms": "ReplaceValueWithAfter",
"transforms.ReplaceValueWithAfter.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ReplaceValueWithAfter.field":"after"
}
}
Данные в формате avro.Когда я запускаю работника, он обрабатывает около 370 тыс. Записей и завершается сбоем в одном конкретном сообщении, что выглядит вполне нормально.Это не повреждено или так.Единственное, что у него есть одно поле micro timestamp = null (столбец может иметь значение null как в схеме avro, так и в базе данных, которую создает этот соединитель).
Всегда получать
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
Однакооднажды получил что-то более подробное (я думаю, что pk.mode установлен в kafka):
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:134)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:70)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:138)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
version: "2.1.0-cp1",
commit: "bda8715f42a1a3db",
Я перепробовал все возможные комбинации (с режимом ключа kafka, режимами вставки, полями белого списка для ограниченияих по минимуму и тд).Ничего не работает: (