Я использую это примеры дебезия
Я добавил "themes.regex": "CID1122. (. *)" в моем jdbc-sink.json следующим образом
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "CID1122.(.*)",
"connection.url": "jdbc:mysql://mysql:3306/inventory?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "debezium",
"auto.create": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"name": "jdbc-sink",
"insert.mode": "upsert",
"pk.fields": "id,companyId",
"pk.mode": "record_value"
}
}
Список тем Кафки:
CID1122.department
CID1122.designation
CID1122.employee
Я сталкиваюсь с кафкой java.lang.NullPointerException
connect_1 | 2019-01-30 06:14:47,302 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,303 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,342 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,343 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,344 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1 | java.lang.NullPointerException
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | 2019-01-30 06:14:47,345 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | Caused by: java.lang.NullPointerException
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1 | ... 10 more
connect_1 | 2019-01-30 06:14:47,345 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
любой обходной путь?