Обновление коннектора Debezium MySQL с опцией белого списка таблиц - PullRequest
0 голосов
/ 28 ноября 2018

Я использую MySQL-коннектор Debezium (0.7.5) и пытаюсь понять, каков наилучший подход, если я хочу обновить эту конфигурацию с параметром table.whitelist.

Допустим,Я создаю соединитель, что-то вроде этого:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

Через некоторое время (2 недели) мне нужно добавить новую таблицу (myDb.table3) к этой опции table.whitelist (и эта таблица являетсястарый, он был создан до коннектора)

То, что я пробовал, было:

  • Приостановить коннектор.
  • Удален раздел истории (возможно, это была проблема?).
  • Обновлена ​​конфигурация через конечную точку конфигурации обновления API.
  • Возобновить разъем.

Команда обновления через API:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

Но это не сработало, и, возможно, это не лучший подход вообще.В других соединителях я не использую опцию table.whitelist, поэтому, когда мне нужно было прослушать новую таблицу, у меня не было этой проблемы.

Моим последним вариантом, я думаю, будет удаление этого соединителя и создание другого с этой новой конфигурацией, также прослушивающей новую таблицу (myDb.table3).Проблема в том, что если мне нужны исходные данные из myDb.table3, мне нужно будет создать снимок initial, но я не хочу генерировать все сообщения из снимка из других таблиц myDb.table1,myDb.table2.

1 Ответ

0 голосов
/ 30 ноября 2018

Изменения в конфигурации белого / черного списка пока не поддерживаются.В настоящее время эта работа решается (см. DBZ-175 ), и мы надеемся получить поддержку для предварительного просмотра в одном из следующих выпусков.Для этого есть ожидающий PR , который требует немного больше работы.

До тех пор, пока это не будет реализовано, лучше всего настроить новый экземпляр соединителя, который only захватывает дополнительные таблицы, которые вас интересуют. Это достигается ценой запуска двух коннекторов (которые оба будут поддерживать сеанс чтения журнала binlog), но это работает, если вам не нужно менятьваш конфиг фильтра слишком часто.

...