Максимальное количество повторных попыток и интервал повторных попыток для Kafka JDBC Sink Connector при неработающей базе данных - PullRequest
0 голосов
/ 26 апреля 2018

Я пытаюсь протестировать и оценить поведение Kafka JDBC Sink Connector, когда база данных не работает.

При получении нового сообщения в Kafka, когда база данных не работает, появляется следующая ошибка:

INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:91)
com.microsoft.sqlserver.jdbc.SQLServerException: Unable to access availability database 'Giorgos' because the database replica is not in the PRIMARY or SECONDARY role. Connections to an availability database is permitted only when the database replica is in the PRIMARY or SECONDARY role. Try the operation again later. 

И после некоторых повторных попыток появится следующая ошибка, и задача будет убита:

ERROR WorkerSinkTask{id=sink-giorgos_test-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

Где можно изменить количество удалений и интервал повторных попыток, который в соответствии с первой ошибкой установлен на 10000 мс?

Скажем, я хочу, чтобы работник продолжал пытаться подключиться к базе данных в течение 5 минут. Какие параметры я должен настроить для этого?

ИЗМЕНИТЬ, чтобы включить необходимые файлы:

sink-file.properties

name=sink-test
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=GIORGOS.TOPIC
connection.url=jdbc:sqlserver://ip:port;DatabaseName=Streaming;user=myuser;password=mypass
auto.create=true

# DB failover
max.retries=10
retry.backoff.ms=10000

pk.mode=record_value
pk.fields=ID
insert.mode=upsert
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=data 

worker.properties (у меня несколько файлов при работе в распределенном режиме)

bootstrap.servers=localhost:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081



config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1



internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

rest.port=8040
rest.advertised.port=8040

plugin.path=/usr/share/java

Ответы [ 2 ]

0 голосов
/ 17 июля 2019

Подключение приемника JDBC имеет те же параметры, что и подключение источника. Следующее используется для попытки подключения

connection.attempts
Maximum number of attempts to retrieve a valid JDBC connection.

Type: int
Default: 3
Importance: low

connection.backoff.ms
Backoff time in milliseconds between connection attempts.

Type: long
Default: 10000
Importance: low

ссылка: https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#database

https://github.com/confluentinc/kafka-connect-jdbc/blob/v5.3.0-rc4/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L43

0 голосов
/ 27 апреля 2018

Как объяснить здесь: https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_config_options.html#retries

Вы можете настроить 2 свойства в конфигурации коннектора:

max.retries=30
retry.backoff.ms=10000

Здесь он будет повторяться 30 раз с ожиданием 10 секунд между каждой попыткой (= 300 секунд = 5 минут)

...