Kafka Consumer CommitFailedException - PullRequest
       7

Kafka Consumer CommitFailedException

0 голосов
/ 30 марта 2020

Я работаю над потребительской программой kafka. Недавно мы развернули его в среде PROD. Там мы столкнулись с проблемой следующим образом:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
    at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)

Насколько я понимаю, к тому времени координатор группы недоступен и вновь обнаружен, интервал сердцебиения (3 секунды в соответствии с документацией) истекает, и потребитель выбрасывается из группа. Это правильно?. Если так, что должно быть обойти это? Если я ошибаюсь, пожалуйста, помогите мне разобраться в этой проблеме и предложите любые идеи, которые вам нужно решить, Я могу поделиться кодом, если необходимо.

Это повторяющаяся проблема в течение последних двух недель в среде Prod. Так что быстрая помощь приветствуется!

1 Ответ

0 голосов
/ 30 марта 2020

Исключение, на которое вы ссылаетесь

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

, дает подсказку о том, что происходит и что можно сделать для решения проблемы.

Проблема:

[...] что время между последующими вызовами poll () было больше, чем настроенный max.poll.interval.ms, что обычно означает, что опрос l oop тратит слишком много времени на обработку сообщений.

Конфигурация max.poll.interval.ms по умолчанию 300000ms или 5minutes. Поскольку ваш потребитель отнимает больше этих 5 минут, он считается сбойным, и группа будет перебалансирована, чтобы переназначить разделы другому участнику (см. Конфигурация потребителя ).

Решение:

Возможное решение также приводится в сообщении об ошибке

Вы можете решить эту проблему, увеличив max.poll.interval.ms или уменьшив максимальный размер пакетов, возвращаемых в poll () с помощью max.poll.records.

Потребитель снова читает все сообщения, поскольку (как показывает ошибка) он не может зафиксировать смещения. Это означает, что если вы запускаете Consumer с тем же group.id, он думает, что он никогда ничего не читает из этой топи c.

Документация Kafka хорошо объясняет Алгоритм перебалансировки потребителя :

Алгоритмы перебалансировки потребителей позволяют всем потребителям в группе прийти к общему мнению о том, какой потребитель потребляет какие разделы. Изменение баланса потребителя запускается при каждом добавлении или удалении как узлов-посредников, так и других потребителей в пределах одной группы. Для данного топи c и данной группы потребителей разделы брокера равномерно распределяются между потребителями внутри группы. Раздел всегда потребляется одним потребителем. Эта конструкция упрощает реализацию. Если бы мы разрешили одновременное использование раздела несколькими потребителями, это привело бы к конфликту в разделе и потребовалась бы какая-то блокировка. Если потребителей больше, чем разделов, некоторые потребители вообще не получат никаких данных. Во время ребалансировки мы стараемся назначать разделы для потребителей таким образом, чтобы уменьшить количество узлов брокера, к которым должен подключаться каждый потребитель.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...