Кафка соединяет много-много таблиц в MSSQL - PullRequest
0 голосов
/ 09 сентября 2018

В настоящее время я изучаю Kafka Connect для потоковой передачи некоторых наших баз данных в озеро данных. Чтобы протестировать Kafka Connect, я настроил базу данных с одной из наших баз данных проекта. Пока все хорошо.

Следующим шагом я настроил Kafka Connect с режимом следующих свойств:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "timestamp.column.name": "updated_at,created_at",
  "incrementing.column.name": "id",
  "dialect.name": "SqlServerDatabaseDialect",
  "validate.non.null": "false",
  "tasks.max": "1",
  "mode": "timestamp+incrementing",
  "topic.prefix": "mssql-jdbc-",
  "poll.interval.ms": "10000",
}

Хотя это работает для большинства моих таблиц, в которых я получил идентификатор и поле create_at / updated_at, оно не будет работать для моих таблиц, где я решил свои отношения «многие ко многим» с таблицей между ними и составной таблицей. ключ. Обратите внимание, что я использую стандартную конфигурацию JDBC с драйвером JDBC от Microsoft.

Есть ли способ настроить Kafka Connect для этих особых случаев?

1 Ответ

0 голосов
/ 10 сентября 2018

Вместо одного соединителя, чтобы вытащить все ваши таблицы, вам может понадобиться создать несколько. Это может иметь место, если вы хотите использовать разные методы для извлечения данных или разные столбцы ID / timestamp. Как говорит @cricket_007, вы можете использовать опцию query, чтобы получить результаты запроса - это может быть SELECT, выражающее ваше соединение с несколькими таблицами. Даже при извлечении данных из одного объекта таблицы, сам JDBC-коннектор просто выдает SELECT * из заданной таблицы с предикатом WHERE, чтобы ограничить строки, выбранные на основе увеличивающегося ID / метки времени.

Альтернативой является использование сбора данных изменений на основе журнала (CDC) и передача всех изменений непосредственно из базы данных в Kafka.

Независимо от того, используете ли вы JDBC или CDC на основе журнала, вы можете использовать потоковую обработку для разрешения соединений в самой Kafka. Примером этого является Kafka Streams или KSQL. Я написал о последнем лоте здесь .

Вы также можете найти эту статью полезной, подробно описывающей ваши варианты интеграции баз данных с Kafka.

Отказ от ответственности: я работаю в Confluent, компании, работающей над проектом с открытым исходным кодом KSQL.

...