Кафка непрерывной ребалансировки при медленном потреблении - PullRequest
0 голосов
/ 12 июня 2019

Я пытался проверить нашу Кафку на наличие некоторых негативных сценариев, и один из них - очень медленный потребитель. Я установил Thread.sleep(15000) в моем методе @KafkaListener (это spring-kafka) и установил параллелизм на 3. У меня есть 1 тема с 1 разделом. Я поместил 10 сообщений в тему и запустил сервис. Когда 3 потребителя начинают, они все получают до (Re-)joining group пункта, но тогда только один из них (предположим, что это consumer-2) доберется до:

Successfully joined group with generation X

И начните медленно потреблять сообщения.

(кстати, я использую режим подтверждения MANUAL_IMMEDIATE, но он воспроизводим, даже когда я не добавляю аргумент Acknowledgement в слушатель и не подтверждаю сообщения). Я вижу следующее: До тех пор, пока все сообщения не будут обработаны потребителем-2, каждые 3 секунды (интервал пульса по умолчанию) Я получил сообщение в консоли:

AbstractCoordinator$HeartbeatResponseHandler: [Consumer clientId=consumer-2, groupId=pixel-group] Attempt to heartbeat failed since group is rebalancing

Интересно, почему это происходит? Только после того, как все 10 сообщений будут обработаны, произойдет еще одна перебалансировка, после которой все 3 потребителя напечатают:

Successfully joined group with generation X

И одному из них будет назначен раздел, и проблем с сердцебиением больше не будет. Это происходит только тогда, когда я устанавливаю интервал сна на значение выше, чем интервал сердцебиения. Обычно это происходит один раз, когда все потребители запускаются, но вскоре после этого они успешно настроятся.

Итак, если подвести итог, то кажется, что:

Если время обработки потребителем > время интервала сердцебиения - все, кроме первого потребителя, не могут завершить восстановление баланса (вероятно, они не могут разговаривать со своим медленным Лидером). Я не понимаю, почему эта ошибка сердцебиения такая постоянная? Почему остальные потребители не могут закончить балансировку где-то между потреблением сообщений Лидера, если сон длится дольше, чем сердцебиение?

UPDATE Кафка версия 2.12-2.2.0 Spring-Kafka 2.2.3. РЕЛИЗ

1 Ответ

0 голосов
/ 12 июня 2019

... параллелизм к 3. У меня 1 тема с 1 разделом ...

Вам нужно как минимум столько же разделов, сколько и потребителей - раздел может быть использован только одним потребителем.

Какую версию вы используете? Начиная с KIP-62 (Kafka 0.10.1.0), сердцебиения отправляются в фоновом режиме клиентами-кафками. Таким образом, восстановление баланса должно происходить только в том случае, если слушателю требуется больше времени, чем max.poll.interval.ms. Google KIP-62 для получения дополнительной информации.

EDIT

Вы должны видеть логи вот так, пока ваш слушатель спит ...

2019-06-13 09:47:52.008 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Heartbeat thread started
...
2019-06-13 09:47:52.072  INFO 61914 --- [           main] com.example.Rbgh664Application           : Sleeping for 15
2019-06-13 09:47:55.120 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:47:55.121 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 10 to node 2147483647
2019-06-13 09:47:55.226 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 10, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:47:55.227 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
2019-06-13 09:47:58.120 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:47:58.120 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 11 to node 2147483647
2019-06-13 09:47:58.225 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 11, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:47:58.226 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
2019-06-13 09:48:01.203 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:48:01.204 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 12 to node 2147483647
2019-06-13 09:48:01.310 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 12, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:48:01.310 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
2019-06-13 09:48:04.285 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:48:04.286 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 13 to node 2147483647
2019-06-13 09:48:04.390 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 13, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:48:04.390 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
20
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...