Соединитель Kafka- Postgres jdb c - NullPointerException - PullRequest
0 голосов
/ 05 марта 2020

Я выполняю следующую команду для копирования данных из Kafka в Postgres

/ confluent / confluent-5.4.0 / bin / connect-standalone /confluent/confluent-5.4.0/etc/ kafka / connect-standalone.properties /confluent/confluent-5.4.0/config/sink-postgres

и получение исключения ниже

[2020-03-05 10:16:21,289] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
java.lang.NullPointerException
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-03-05 10:16:21,290] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more
[2020-03-05 10:16:21,291] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

Файл свойств приемника:

name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=ksqldb_topic_01
connection.url=jdbc:postgresql://10.22.2.40:5432/my_db?user=postgres&password=postgres
user=postgres&password=postgres
insert.mode=insert
pk.mode=none
pk.fields=none
auto.create=true
auto.evolve=true

Данные выглядят так

{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}

Что я делаю не так?

...