Как развернуть соединение приемника kafka с несколькими темами и назначением таблицы - PullRequest
0 голосов
/ 15 апреля 2020

Исходя из моего предыдущего вопроса , я решил дать больше согласия на развертывание потребителей для синхронизации базы данных в реальном времени с распределенной Kafka. Тот же случай; У меня есть более сотни таблиц, которые я хочу перенести с PostgreSQL на SQL Сервер. От PostgreSQL до Кафки я использовал разъемы Debezium с плагинами wal2 json. А с Kafka на SQL Server я использую JDB C Connectors. У меня есть три идентичных посредника по настройкам (разные адреса):

broker.id=0
broker.rack=1
port=9093
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dir=/home/admin/kafka/tmp/kafka_log1
offsets.topic.num.partition=1
offsets.topic.replication.factor=3
min.isnyc.replicas=2
default.replication.factor=3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=7200000
delete.topic.enable=true
message.max.bytes=50497182 
replica.fetch.max.bytes=50497182
group.max.session.timeout.ms=7200000

Я попробовал какое-то возможное решение, подобное этому:

  1. Установить темы для использования 1 раздела и 3 реплик. Поскольку в моей таблице указано _, я получаю предупреждение об этом.
kafka-topics.sh -create --bootstrap-server localhost:9093,localhost:9094,localhost:9095  --replication-factor 3 --partitions 1 --topic $topic_name --config retention.ms=5400000
Разделяю разъемы debezium и jdb c с разными работниками. У меня есть два рабочих с одинаковой конфигурацией (за исключением хост-порта, 8085 для дебезиума и 8084 для приемника), например:
bootstrap.servers=localhost:9093,localhost:9094,localhost:9095
group.id=debezium-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets-debezium
offset.storage.replication.factor=3
config.storage.topic=connect-configs-debezium
status.storage.topic=connect-status-debezium
producer.buffer.memory=29999999
producer.max.buffered.records=19999999
producer.max.request.size=51497182 
producer.retries=100
producer.max.in.flight.requests.per.connection=1
producer.request.timeout.ms=20000
producer.enable.idempotence=true
producer.retry.backoff.ms=500
producer.send.buffer.bytes=50497182
producer.receive.buffer.bytes=50497182
producer.ack=1
offset.flush.timeout.ms=300000
producer.buffer.memory=51497182
consumer.enable.auto.commit=true
consumer.retries=100
consumer.auto.commit.interval.ms=100000
consumer.max.partition.fetch.bytes=50497182
consumer.max.poll.records=10000
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=50000
consumer.session.timeout.ms=50000
consumer.auto.offset.reset=latest
consumer.isolation.level=read_committed
consumer.max.poll.interval.ms=5400000
fetch_max_bytes=50497182
rest.port=8085
plugin.path=/home/admin/kafka/connectors
L oop коннекторы приемника один за другим без:
#!/bin/bash
CSV_LIST="/home/admin/kafka/main/config/tables/table_lists.csv"
DATA=${CSV_LIST}

while IFS=',' read table pk mode; do
topic_name=${table} 
curl -X POST http://localhost:8084/connectors -H 'Content-Type:application/json' -d '{"name" :"sqlservercon_'$topic_name'",
    "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "topics":"'$table'",
            "connection.url":"jdbc:sqlserver://-:1433",
            "connection.user":"-",
            "connection.password":"-",
            "transforms":"unwrap",
            "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones":"false",
            "auto.create":"true",
            "insert.mode":"'$mode'",
            "pk.fields":" '$pk'",
            "pk.mode":"record_value",
            "destination.table.format":"db.dbo.'$table'"
}}' | jq
done < ${DATA}

Вот как я это разверну:

  1. Запуск сервера zookeeper и kafka
  2. Создание тем
  3. Запуск рабочих kafka для источника Debezium
  4. Добавление разъемов debezium (поскольку 1 дБ требуется только один разъем)
  5. Запуск рабочих kafka для мойки
  6. Добавление коннекторов jdb c от l oop

К сожалению, меня все еще не устраивает перемещение всех данных в новую SQL серверную базу данных из-за нескольких случаев взаимоблокировки и неосведомленности потребителей , Я хочу знать, есть ли хорошее предложение для оптимального развертывания потребителя. Нужно ли добавлять по одному рабочему для каждого разъема или выполнять переключение между темами?

1 Ответ

0 голосов
/ 16 апреля 2020

У меня есть проверка, что я думаю, что из-за того, что Kafka connect jdb c использует batch.record для упорядоченного количества записей, которые должны быть отправлены на сервер SQL, возникает проблема, когда я использую upsert с большим размером записи. Я предполагаю, что должен уменьшить партию до 1, как в источнике, так и в приемнике. Это еще предварительный ответ. А также, если кто-то знает, как показать запрос SQL, используемый для вставки в Kafka connect JDB C, мне будет полезно узнать механизм поведения JDB C и как решить тупик.

И лучший опыт, насколько я знаю, если целевая база данных существует, но нет таблицы внутри, это определять приоритеты, какая таблица должна быть вставлена ​​первой, и ждать, пока она не будет выполнена, и не использовать вставку. Для таблицы менее 100000 строк могут быть сгруппированы в одну группу, но таблица больших размеров должна извлекаться отдельно.

...