Режим слияния JDBC - PullRequest
       11

Режим слияния JDBC

1 голос
/ 29 мая 2019

Я использую пользовательский запрос в соединителе исходного кода JDBC kafka. Кто-нибудь может мне сказать, каков режим на момент использования настраиваемого запроса в исходном соединителе JDBC kafka, если я использую режим массовой рассылки, тогда он снова вставит все данные в тему kafka.,примечание: -у меня не было никакого первичного ключа или столбца метки времени в моей таблице.

Ответы [ 2 ]

0 голосов
/ 07 июня 2019

Если у вас нет столбца метки времени или увеличивающегося идентификатора, вы не можете выполнять CDC на основе запросов, вы можете выполнять только групповую загрузку.

Ваша альтернатива - использовать основанный на логах CDC с таким инструментом, как Debezium.

В этом выступлении подробно рассматриваются все опции и доступные инструменты: http://rmoff.dev/ksny19-no-more-silos

0 голосов
/ 29 мая 2019

Вы можете использовать либо инкремент или отметку времени

incrementing - используйте строго увеличивающийся столбец в каждой таблице, чтобы обнаруживать только новые строки. Обратите внимание, что это не будет обнаруживать изменения или удаления существующих строк.

timestamp - использовать временную метку (или как метка времени) для обнаружения новых и измененных строк. Это предполагает столбец обновляется при каждой записи, и значения монотонно увеличивающийся, но не обязательно уникальный.

timestamp+incrementing - использовать два столбца, столбец отметки времени, который обнаруживает новые и измененные строки и строго увеличивающийся столбец, который обеспечивает глобально уникальный идентификатор для обновлений, поэтому каждая строка может быть назначена уникальное смещение потока.

Пример для timestamp:

name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products

mode=timestamp
timestamp.column.name=last_modified

topic.prefix=mysql-test-

Пример для incrementing:

name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products

mode=incrementing
incrementing.column.name=id

topic.prefix=mysql-test-

Пример для timestamp+incrementing:

name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products

mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=last_modified

topic.prefix=mysql-test-
...