У меня работает докер-контейнер с соединителем сервера Kafka-connect и debezium sql.
Когда я запускаю соединитель, он делает снимок базы данных и отправляет сообщениям kafka о данных, которые есть в базе данных. в этот момент.
Если я начинаю делать обновления, вставляет и удаляет в базе данных сразу после этих снимков и сообщений, отправляемых на kafka, коннектор полностью реагирует и получает обновления базы данных.
Если я оставил коннектор работающим без каких-либо изменений в базе данных, например, 30 минут, 1 час, похоже, что поток коннектора спит, и для получения новых обновлений требуется некоторое время. В последнем тесте, который я проводил, коннектору потребовалось 20 минут, чтобы проснуться и получить новые обновления, но как только он проснулся, он полностью реагирует.
Я обнаружил вопрос в переполнении стека, похожем на этот, но для разъема debezium mongodb. Решение, описанное в этом вопросе, не работает для меня, потому что я не могу перезапустить задачу коннектора, потому что я не знаю, когда произойдут изменения в базе данных, потому что это устаревшее приложение, которое вносит изменения в базу данных.
Кто-нибудь знаетЛюбая другая альтернатива, чтобы держать задачу соединителя синхронизированной / реагировать на изменения базы данных?
Конфигурация соединителя:
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.history.kafka.topic": "asw.myConnector",
"internal.key.converter.schemas.enable": "false",
"table.whitelist": "dbo.user,dbo.test_table1,dbo.test_table2",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"tombstones.on.delete": "false",
"schema.registry.url": "http://xxxx:8081",
"schema.registry.basic.auth.credentials.source": "USER_INFO",
"database.history.kafka.recovery.poll.interval.ms": "20000",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.log.enable": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"database.user": "sa",
"database.dbname": "myDb",
"database.history.kafka.bootstrap.servers": "xxxx:9092",
"database.server.name": "asw",
"database.port": "1433",
"key.converter.basic.auth.user.info": "xxxx",
"value.converter.schema.registry.url": "http://xxxx:8081",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.basic.auth.user.info": "xxxx",
"database.hostname": "xxxx",
"database.password": "xxxx",
"internal.value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.registry.basic.auth.user.info": "xxxx",
"key.converter.schema.registry.url": "http://xxxx:8081",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"transforms": "InsertTenantId, InsertInstanceId, ValueToKey",
"transforms.InsertTenantId.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTenantId.static.field": "tenant_id",
"transforms.InsertTenantId.static.value": "xxxx",
"transforms.InsertInstanceId.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertInstanceId.static.field": "instance_id",
"transforms.InsertInstanceId.static.value": "xxxx",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields":"tenant_id,instance_id"
}
}