Сообщения Kafka потребляются несколько раз разными потребителями с одинаковым идентификатором группы - PullRequest
0 голосов
/ 06 июня 2019

Я использую Кафку с веточкой Java. Я вижу, что сообщения, которые создаются только один раз, потребляются несколько раз.

Иногда сообщение используется только один раз, а другое - несколько раз. Согласно моим журналам, обработка сообщений начинается в очень близкое время с разницей в несколько миллисекунд. Я вижу, что обработка сообщений происходит несколько раз без ошибок и заканчивается до 'max.poll.interval.ms'.

Иногда обработка выполняется в одном и том же служебном модуле в разных потоках, а иногда - в разных. Все капсулы имеют одинаковый идентификатор группы. Это происходит во всех темах этой группы Id.

Кроме того, я постоянно вижу много следующих ошибок:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar!/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1327) ~[spring-kafka-2.1.5.RELEASE.jar!/:2.1.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.1.5.RELEASE.jar!/:2.1.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) [spring-kafka-2.1.5.RELEASE.jar!/:2.1.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]

и

Фиксация не может быть завершена, так как группа уже перебалансировала и присвоила разделы другому участнику. Это означает, что время между последующими вызовами poll () было больше, чем настроенный max.poll.interval.ms, что обычно подразумевает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете решить эту проблему, увеличив тайм-аут сеанса или уменьшив максимальный размер пакетов, возвращаемых в poll (), с помощью max.poll.records.

Мои потребительские конфигурации kafka:

              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
                ackEachRecord: true
                configuration:
                  max.poll.records: 20
                  max.poll.interval.ms: 1000000

Заранее спасибо за любую помощь !!!

...