Я использую Kafka Connect в распределенном режиме. Странное поведение, которое я наблюдал несколько раз, заключается в том, что через некоторое время (может быть часы, может быть дни) возникает ошибка балансировки: одни и те же задачи назначаются нескольким работникам. В результате они работают одновременно и, в зависимости от природы разъема, выходят из строя или выдают «непредсказуемые» выходные данные.
Простейшая конфигурация, которую я смог использовать для воспроизведения поведения, - это два сотрудника Kafka Connect,два разъема, каждый разъем только с одной задачей. Kafka Connect развернут в Кубернетес. Кафка сама находится в Confluent Cloud. И Kafka Connect, и Kafka имеют одинаковую версию (5.3.1).
Соответствующие сообщения из журнала:
Рабочий A:
[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Рабочий B:
[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-mqtt-source], taskIds=[some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
В приведенных выше выдержках журнала видно, что одно и то же задание (some-mqtt-source-0
) назначено двум работникам. После этого сообщения я также вижу сообщения журнала по экземплярам задач на обоих рабочих.
Это поведение не зависит от разъема (я наблюдал это и с другими задачами). Это также происходит не сразу после запуска рабочих, а только через некоторое время.
Мой вопрос: что может быть причиной такого поведения?
РЕДАКТИРОВАТЬ 1 : Я пытался запустить 3 рабочих вместо двух, думая, что это может быть распределенной проблемой консенсуса. Похоже, что нет, и наличие 3 работников не решает проблему.
РЕДАКТИРОВАТЬ 2 : я заметил, что незадолго до того, как назначен работник A задача, которая первоначально выполнялась на работнике B , этот работник ( B ) обнаружил ошибку при присоединении к группе. Например, если задачи «дублируются» в поколении N, рабочий B не будет иметь сообщения «Успешно присоединенная группа с поколением N» в журналах. Более того, между поколениями N-1 и N + 1 рабочий B обычно регистрирует такие ошибки, как Attempt to heartbeat failed for since member id
и Group coordinator bx-xxx-xxxxx.europe-west1.gcp.confluent.cloud:9092 (id: 1234567890 rack: null) is unavailable or invalid
. Рабочий B обычно присоединяется к поколению N + 1 вскоре после поколения N (иногда всего за 3 секунды). Теперь понятно, что вызывает поведение. Однако:
, хотя я понимаю, что могут быть временные проблемы, подобные этим, и они, вероятно, являются нормальными в общем случае, почему перебалансировка не устраняет проблему после все серверы успешно присоединяются к следующему поколению ? Хотя за этим следует еще больший упадок сил - он неправильно распределяет задачи и сохраняет «дубликаты» навсегда (до тех пор, пока я не перезапущу работников).
кажется, что в некоторые периоды перебалансировка происходит почтиодин раз в несколько часов, а в другие периоды это происходит каждые 5 минут (с точностью до секунд);Что может быть причиной? что нормальное?
в чем может быть причина ошибок «Координатор группы недоступен или недействителен», учитывая, что я использую Confluent Cloud, и есть какие-либо параметры конфигурации, которые можно настроить вKafka Connect, чтобы сделать его более устойчивым к этой ошибке ? Я знаю, что есть session.timeout.ms
и heartbeat.interval.ms
, но документация настолько минималистична, что даже не ясно, каково практическое влияние изменения этих параметров на меньшие или большие значения.
РЕДАКТИРОВАТЬ 3 : я заметил, что проблема не является критической для задач приемника: хотя одни и те же задачи приемника назначаются нескольким рабочим, соответствующие потребители назначаются различным разделам, как обычно, ивсе работает почти как надо - я просто получил больше заданий, чем первоначально просил. Однако в случае исходных задач нарушает поведение - задачи выполняются одновременно и конкурируют за ресурсы на стороне источника.
РЕДАКТИРОВАТЬ 4 :Тем временем я понизил Kafka Connect до версии 2.2 (Confluent Platform 5.2.3) - до «Инкрементной совместной перебалансировки» версии. Работает нормально последние 2 дня. Итак, я предполагаю, что поведение связано с новым механизмом ребалансировки.