Как глотать 250 столов в Кафку от MS SQL с Debezium - PullRequest
2 голосов
/ 23 марта 2020

Привет, я пытаюсь построить конвейер соединения Kafka между PostgreSQL в качестве источника и SQL Сервером в качестве пункта назначения. Я использовал 3 брокеров Kafka, и мне нужно использовать 252 темы (одна тема такая же, как одна таблица PostgreSQL). После более чем часа работы он может извлечь только 218 из 252 таблиц. Ошибка, которую я обнаружил, заключается в том, что в SQL сервере есть механизм взаимоблокировки, который может удерживать транзакцию на SQL сервере и пытаться повторить попытку, также есть слот репликации Debezium.

Я использую распределенные соединители с максимум 3 максимумами. Рабочий на мойке, но, возможно, этого недостаточно. Также попробуйте с более высоким offset.time_out.ms до 60000 и более высоким смещенным разделом (100). Боюсь, что это не тот уровень производства, который мне нужен. Кто-нибудь может дать предложение по этому делу? Есть ли какие-либо расчеты, чтобы определить лучшее количество работников, которые мне нужны?

ОБНОВЛЕНИЕ

здесь некоторые ошибки, которые я получаю. Я вижу, что некоторые разъемы убиты. Один говорит мне, что тупик возникает в SQL SERVER :

[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

ОБНОВЛЕНИЕ 14 апреля 2020

У меня все еще есть проблема, Я забыл рассказать о том, как я развернуть разъемы. Теперь я использую 2 рабочих, один для источника и один для раковины. Я перечисляю все мои таблицы и pk в csv и l oop через строки, чтобы создать соединители без сна или ждать каждую минуту. Я также использую отдельный раздел тем и 3 реплики для каждой темы. Но у меня все еще есть sql тупик соединения с сервером

1 Ответ

1 голос
/ 23 марта 2020

Возможно, проблема в доступе к одной и той же таблице SQL с несколькими задачами в одно и то же время и вызывает проблемы синхронизации, такие как взаимоблокировки, как вы упоминали.
Поскольку у вас уже есть большое количество тем, и ваш соединитель может получать к ним доступ параллельно, я бы посоветовал вам уменьшить количество разделов для каждой топи c до 1 (уменьшение количества разделов не поддерживается в Kafka, поэтому вы должны удалять и создавать заново каждый topi c с новым числом разделов).
Таким образом, каждый topi c имеет только один раздел; доступ к каждому разделу возможен только в одном потоке (/ задача / потребитель), поэтому вероятность параллельных транзакций SQL с одной и той же таблицей отсутствует.

В качестве альтернативы, лучший подход заключается в создании одной топи c с 3 разделами (столько же, сколько у вас есть заданий / потребителей), и в том, чтобы производитель использовал SQL имя таблицы в качестве ключа сообщения .
Kafka гарантирует сообщения с одним и тем же ключом всегда go для одного и того же раздела, поэтому все сообщения с одной и той же таблицей будут находиться в одном разделе (одном потоке). потребляя).

Если вы сочтете это полезным, я могу приложить дополнительную информацию о том, как создать Kafka Producer и отправлять сообщения с ключами.

...