Debezium Sql Server Connector не получает обновления после некоторого неактивного времени - PullRequest
0 голосов
/ 30 октября 2019

У меня работает докер-контейнер с соединителем сервера Kafka-connect и debezium sql.

Когда я запускаю соединитель, он делает снимок базы данных и отправляет сообщениям kafka о данных, которые есть в базе данных. в этот момент.

Если я начинаю делать обновления, вставляет и удаляет в базе данных сразу после этих снимков и сообщений, отправляемых на kafka, коннектор полностью реагирует и получает обновления базы данных.

Если я оставил коннектор работающим без каких-либо изменений в базе данных, например, 30 минут, 1 час, похоже, что поток коннектора спит, и для получения новых обновлений требуется некоторое время. В последнем тесте, который я проводил, коннектору потребовалось 20 минут, чтобы проснуться и получить новые обновления, но как только он проснулся, он полностью реагирует.

Я обнаружил вопрос в переполнении стека, похожем на этот, но для разъема debezium mongodb. Решение, описанное в этом вопросе, не работает для меня, потому что я не могу перезапустить задачу коннектора, потому что я не знаю, когда произойдут изменения в базе данных, потому что это устаревшее приложение, которое вносит изменения в базу данных.

Кто-нибудь знаетЛюбая другая альтернатива, чтобы держать задачу соединителя синхронизированной / реагировать на изменения базы данных?

Конфигурация соединителя:

{
    "name": "myConnector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "database.history.kafka.topic": "asw.myConnector",
        "internal.key.converter.schemas.enable": "false",
        "table.whitelist": "dbo.user,dbo.test_table1,dbo.test_table2",
        "value.converter.basic.auth.credentials.source": "USER_INFO",
        "tombstones.on.delete": "false",
        "schema.registry.url": "http://xxxx:8081",
        "schema.registry.basic.auth.credentials.source": "USER_INFO",
        "database.history.kafka.recovery.poll.interval.ms": "20000",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "errors.log.enable": "true",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "database.user": "sa",
        "database.dbname": "myDb",
        "database.history.kafka.bootstrap.servers": "xxxx:9092",
        "database.server.name": "asw",
        "database.port": "1433",
        "key.converter.basic.auth.user.info": "xxxx",
        "value.converter.schema.registry.url": "http://xxxx:8081",
        "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.basic.auth.user.info": "xxxx",
        "database.hostname": "xxxx",
        "database.password": "xxxx",
        "internal.value.converter.schemas.enable": "false",
        "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "schema.registry.basic.auth.user.info": "xxxx",
        "key.converter.schema.registry.url": "http://xxxx:8081",
        "key.converter.basic.auth.credentials.source": "USER_INFO",
        "transforms": "InsertTenantId, InsertInstanceId, ValueToKey",
        "transforms.InsertTenantId.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertTenantId.static.field": "tenant_id",
        "transforms.InsertTenantId.static.value": "xxxx",
        "transforms.InsertInstanceId.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertInstanceId.static.field": "instance_id",
        "transforms.InsertInstanceId.static.value": "xxxx",
        "transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.ValueToKey.fields":"tenant_id,instance_id"
    }
}
...