Потребитель Кафки зависает на неопределенное время после восстановления баланса - PullRequest
0 голосов
/ 24 мая 2019

Я пытаюсь использовать потребительскую библиотеку kafka, которая написана в моей организации.Он берет данные JSON из раздела Kafka и сохраняет их в базе данных Mongo.Хотя я не могу опубликовать этот код, это очень простая архитектура, которая использует маршруты Apache Camel, а затем сохраняет использованные сообщения в Mongo, используя зависимость Springboot Mongo.

Я сталкиваюсь с ситуацией, когда при развертывании в OpenShift и масштабировании более 1 модуля получено исключение, указанное ниже, а затем приложение зависает без каких-либо дополнительных действий или обработки.Я полагаю, что сбой происходит в логике, которая находится в клиентской библиотеке (ях) kafka.

Я попытался запустить два экземпляра приложения локально, через разные порты.Это прекрасно работает без ошибок.Я попытался установить интервал сердцебиения, время ожидания сеанса, размер пакета, максимальное количество байтов выборки, количество одновременных потребителей, включение / выключение режима SEDA и время ожидания запроса.При изменении этих настроек Kafka вверх, вниз, вкл, выкл и неопределенное, проблемы остаются.

2019-05-23 16:15:51 [Camel (camel-1) thread #1 - KafkaConsumer[mytopic]] ERROR o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group mytopic-status
2019-05-23 16:15:51 [Camel (camel-1) thread #7 - KafkaConsumer[mytopic]] ERROR o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group mytopic-status
2019-05-23 16:15:51 [Camel (camel-1) thread #7 - KafkaConsumer[mytopic]] WARN  o.a.c.component.kafka.KafkaConsumer - Error consuming mytopic-Thread 0 from kafka topic. Caused by: [org.apache.kafka.clients.consumer.CommitFailedException - Commit cannot be completed due to group rebalance]
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:132)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
111
...