Я пытаюсь протестировать и оценить поведение Kafka JDBC Sink Connector, когда база данных не работает.
При получении нового сообщения в Kafka, когда база данных не работает, появляется следующая ошибка:
INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:91)
com.microsoft.sqlserver.jdbc.SQLServerException: Unable to access availability database 'Giorgos' because the database replica is not in the PRIMARY or SECONDARY role. Connections to an availability database is permitted only when the database replica is in the PRIMARY or SECONDARY role. Try the operation again later.
И после некоторых повторных попыток появится следующая ошибка, и задача будет убита:
ERROR WorkerSinkTask{id=sink-giorgos_test-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
Где можно изменить количество удалений и интервал повторных попыток, который в соответствии с первой ошибкой установлен на 10000
мс?
Скажем, я хочу, чтобы работник продолжал пытаться подключиться к базе данных в течение 5 минут. Какие параметры я должен настроить для этого?
ИЗМЕНИТЬ, чтобы включить необходимые файлы:
sink-file.properties
name=sink-test
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=GIORGOS.TOPIC
connection.url=jdbc:sqlserver://ip:port;DatabaseName=Streaming;user=myuser;password=mypass
auto.create=true
# DB failover
max.retries=10
retry.backoff.ms=10000
pk.mode=record_value
pk.fields=ID
insert.mode=upsert
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=data
worker.properties (у меня несколько файлов при работе в распределенном режиме)
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8040
rest.advertised.port=8040
plugin.path=/usr/share/java