Kafka Connect JDBC Sink - themes.regex не работает - PullRequest
0 голосов
/ 30 января 2019

Я использую это примеры дебезия

Я добавил "themes.regex": "CID1122. (. *)" в моем jdbc-sink.json следующим образом

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics.regex": "CID1122.(.*)",
    "connection.url": "jdbc:mysql://mysql:3306/inventory?verifyServerCertificate=false",
    "connection.user": "root",
    "connection.password": "debezium",
    "auto.create": "true",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "name": "jdbc-sink",
    "insert.mode": "upsert",
    "pk.fields": "id,companyId",
    "pk.mode": "record_value"
}
}

Список тем Кафки:

CID1122.department
CID1122.designation
CID1122.employee

Я сталкиваюсь с кафкой java.lang.NullPointerException

connect_1    | 2019-01-30 06:14:47,302 INFO   ||  Checking MySql dialect for existence of table "CID1122"."employee"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1    | 2019-01-30 06:14:47,303 INFO   ||  Using MySql dialect table "CID1122"."employee" absent   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1    | 2019-01-30 06:14:47,342 INFO   ||  Checking MySql dialect for existence of table "CID1122"."employee"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1    | 2019-01-30 06:14:47,343 INFO   ||  Using MySql dialect table "CID1122"."employee" absent   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1    | 2019-01-30 06:14:47,344 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1    | java.lang.NullPointerException
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1    |  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1    |  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1    |  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1    |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1    |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1    |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1    |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1    |  at java.lang.Thread.run(Thread.java:748)
connect_1    | 2019-01-30 06:14:47,345 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
connect_1    | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1    |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1    |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1    |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1    |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1    |  at java.lang.Thread.run(Thread.java:748)
connect_1    | Caused by: java.lang.NullPointerException
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1    |  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1    |  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1    |  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
 connect_1    |  ... 10 more
 connect_1    | 2019-01-30 06:14:47,345 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]

любой обходной путь?

1 Ответ

0 голосов
/ 22 марта 2019

Вам не хватает table.name.format свойство https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_config_options.html (раздел отображения данных)

здесь рабочий пример:

{
    "name": "test-0005",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics.regex": "CID1122.(.*)",
    "connection.user": "kafka",
    "table.name.format": "${topic}",
    "connection.password": "kafka",
    "connection.url": "jdbc:mysql://databasehost:3306/dbname",
    "auto.create": "true",
    "transforms": "route",
    "transforms.t1.replacement": "$2",
    "transforms.route.regex": "([^.]+)\\.([^.]+)",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
}

Чтопроисходит?

  • table.name.format: определяет таблицу назначения для записи событий, в этом случае я использую $ {topic} заполнитель, это означает, что он будет использовать имя темы.
  • themes.regex: будет получать данные из каждой темы, которая соответствует этому шаблону

, как вы можете видетьЯ добавил преобразование RegexRouter , чтобы динамически извлекать имя темы перед погружением в MySQL, шаблон, который я использовал: ([^.] +) \. ([^.] +) соответствует нашему themes.regex CID1122. [Имя-события] , а затем я извлек только группу 2 (имя-события).

  • transforms:"route"
  • transforms.t1.regex:"([^.] +) \. ([^.] +) "
  • transforms.t1.replacement: " $ 2 "

в концеэта группа $ 2 будет передана в table.name.format как $ {topic} , затем вы можете подключиться к базе данных и проверить поступающие данные.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...