Я думаю, что эта проблема связана с # 539 , но я не знаю, является ли это ошибкой, или пользователь должен сам ее устранить.
Итак, у меня есть группа потребителей, когда я увеличиваю количество потребителей в этой группе, отзыв раздела вызывает следующую ошибку:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$commit(KafkaConsumerActor.scala:430)
at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:210)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
at akka.actor.Timers.aroundReceive(Timers.scala:55)
at akka.actor.Timers.aroundReceive$(Timers.scala:40)
at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Этого не происходит, когда я уменьшаю количество потребителей. Я имею в виду, что пока я этого не наблюдал. Я предполагаю, что это потому, что раздел не отменяется при уменьшении. Оставшийся потребитель просто получает новый раздел.
Обратите внимание, что я делаю групповые сообщения и фиксирую пакетные вещи.
Вот как выглядит мой код
val source = Consumer.committableSource(consumerSettings, subscription)
.async
.groupBy(Int.MaxValue, computeNamedGraph)
.groupedWithin(conf.tripleStoreSettings.batchSize, conf.tripleStoreSettings.batchWindowSec seconds)
.map(toUpdateStatements)
.async
.mergeSubstreams
.map(toHttpRequest)
.map(p => p.data -> p)
.via(poolClientFlow)
.async
.map { case (respone, payload) => Payload(respone, payload.offsets) }
.mapConcat(handleResponse)
.via(Committer.flow(committerDefaults.withMaxBatch(conf.tripleStoreSettings.batchSize)))
val (killSwitch, streamResults) = source
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
streamResults.onComplete {
case Success(_) =>
logger.info("Stream finished")
system.terminate()
case Failure(e) =>
logger.error("Stream failed:", e)
system.terminate()
}
Мое решение просто делает следующее:
private val decider: Supervision.Decider = {
e => {
logger.error(s"Stream failed. ${e.getMessage} ${e.getStackTrace.map(_.toString).mkString("\n")}", e)
Supervision.Stop
}
}
Так что, исходя из прочтения # 539, я понимаю, что у меня есть несколько сообщений для подтверждения, и я не могу из-за отзыва. То есть существует некоторый перебаланс, связанный с отзывом, который происходит, когда число потребителей увеличивается.
Мой сервис хотя бы один раз, так что я не против, если другой потребитель обработает это сообщение. у нас нет ограничений на доставку максимум один.
Мой вопрос будет заключаться в том, что до тех пор, пока библиотека не обработает эти ситуации изначально, как я могу в любом случае зафиксировать их, когда произойдет отзыв, или, что еще лучше, просто отбросить их, чтобы потребитель, которому назначен раздел, которому они также принадлежат, обработал их.
Есть предложения? Я проверяю BalanceListener, но я не уверен, как использовать его для этой ситуации.
Примечание Мои конфиги тайм-аута
val subscription = Subscriptions.topicPattern(conf.kafkaConsumer.sourceTopic)
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(conf.kafkaBroker.bootstrapServers)
.withGroupId(conf.kafkaConsumer.groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, conf.kafkaConsumer.offsetReset)
.withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000000")
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100000")