max.poll.records
параметр по умолчанию равен 500. Так что иногда возможно не получить все сообщения из всех разделов в топи c одним опросом ().
max.poll.records : максимум количество записей, возвращаемых в одном вызове poll ().
Кстати, наличие только одного потребителя в группе не является подходящим способом использования topi c с разделами. Ваше количество потребителей в группе потребителей должно быть равно количеству разделов в topi c, подписанных в лучших практиках. (Kafka назначает разделы потребителям по умолчанию равномерно) В противном случае вы не можете масштабировать нагрузку по горизонтали, и в этом случае наличие разделов не имеет особого смысла.
Kafka всегда назначает разделы потребителям. Невозможно иметь раздел, который не назначен потребителю. (Если подписано это topi c)
Но в вашем случае, поскольку вы покидаете потребителя, требуется некоторое время (session.timeout.ms
), чтобы Kafka считал этого потребителя мертвым. Если вы снова запускаете потребителя, не дожидаясь прохождения session.timeout.ms
, то Кафка понимает, что в группе потребителей есть два активных потребителя, и равномерно назначает разделы этому двум потребителям. (например: разделы 0, 1, 2 для потребителя-1 и разделы 3, 4, 5 для потребителя-2) Но после того, как Кафка осознает, что один из потребителей мертв, в группе потребителей начинается перебалансировка, и все разделы назначаются один активный потребитель в группе потребителей.
session.timeout.ms : время ожидания, используемое для обнаружения сбоев клиента при использовании средства управления группой Kafka. Клиент периодически посылает биения, чтобы указать его жизнеспособность брокеру. Если посредник не получил тактовые импульсы до истечения этого тайм-аута сеанса, то брокер удалит этого клиента из группы и инициирует перебалансировку. Обратите внимание, что значение должно находиться в допустимом диапазоне, настроенном в конфигурации посредника для group.min.session.timeout.ms и group.max.session.timeout.ms
Вы можете проверить текущий раздел назначение для вашей группы потребителей с помощью этой команды cli на стороне брокера:
./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group yourConsumerGroup