NullPointerException для kafka-connect-jdbc (postgres) без надлежащей трассировки (WorkerSinkTask.deliverMessages) - PullRequest
0 голосов
/ 01 апреля 2019

Я использую JdbcSinkConnector с postgres.

Конфиг:

{
   "name":"jdbc",
   "config":{
      "topics":"some_topic",
      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url": "jdbc:postgresql://ip/sometable?user=postgresadmin&password=pass",
      "auto.create": true,
      "table.name.format": "sometable",
      "pk.mode": "none",
      "insert.mode": "insert",
      "tasks.max":"1",
      "transforms": "ReplaceValueWithAfter",
      "transforms.ReplaceValueWithAfter.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
      "transforms.ReplaceValueWithAfter.field":"after"
   }  
}

Данные в формате avro.Когда я запускаю работника, он обрабатывает около 370 тыс. Записей и завершается сбоем в одном конкретном сообщении, что выглядит вполне нормально.Это не повреждено или так.Единственное, что у него есть одно поле micro timestamp = null (столбец может иметь значение null как в схеме avro, так и в базе данных, которую создает этот соединитель).

Всегда получать

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException 

Однакооднажды получил что-то более подробное (я думаю, что pk.mode установлен в kafka):

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:134)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:70)
    at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:138)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more
version: "2.1.0-cp1",
commit: "bda8715f42a1a3db",

Я перепробовал все возможные комбинации (с режимом ключа kafka, режимами вставки, полями белого списка для ограниченияих по минимуму и тд).Ничего не работает: (

...