Я хочу создать KafkaConsumer
с Kafka 2.0.0, который будет использовать все доступные сообщения один раз и немедленно завершить работу.Это немного отличается от стандартной консольной утилиты-потребителя, потому что эта утилита ожидает указанного времени ожидания для новых сообщений и завершает работу только по истечении этого времени ожидания.
Эта, казалось бы, простая задача кажется удивительнойтрудно использовать KafkaConsumer
.Моя внутренняя реакция была следующим псевдокодом:
consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
do
result = consumer.poll(Duration.ofMillis(0))
// onResult(result)
while result is not empty
Однако это не работает, так как poll
всегда возвращает пустую коллекцию, даже если в теме много сообщений.
Изучая это, похоже, что одной из причин могло быть то, что назначение / подписка считалось ленивым , а разделы не назначались до тех пор, пока не завершился цикл poll
(хотя я не могу найти никакой поддержки для этого утверждения вдокументы).Однако следующий псевдокод также возвращает пустую коллекцию при каждом вызове poll
:
consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// deprecated poll also returns nothing
result = consumer.poll(0)
// returns nothing
result = consumer.poll(0)
// returns nothing
result = consumer.poll(0)
...
Так что ясно, что «лень» не является проблемой.
Javadoc утверждает:
Этот метод немедленно возвращается, если есть доступные записи.
, что, по-видимому, подразумевает, что первый псевдокод выше должен работать.Однако это не так.
Единственное, что, кажется, работает, это указать ненулевое время ожидания для poll
, а не просто любое ненулевое значение, например, 1
не работает,Это указывает на то, что внутри poll
происходит некоторое недетерминированное поведение, которое предполагает, что poll
всегда будет выполняться в бесконечном цикле, и не имеет значения, что оно иногда возвращает пустую коллекцию, несмотря на наличие сообщений.Код , кажется, подтверждает это различными вызовами, чтобы проверить, истек ли тайм-аут, разбросанный по всей реализации poll
и его вызываемым абонентам.
Так что при наивном подходе более длительное время ожидания очевиднотребуется (и в идеале Long.MAX_VALUE
, чтобы избежать недетерминированного поведения более короткого интервала опроса), но, к сожалению, это заставит потребителя заблокировать последний опрос, что нежелательно в этой ситуации.При наивном подходе у нас теперь есть компромисс между тем, насколько детерминированным мы хотим, чтобы поведение было, и тем, как долго мы должны ждать без причины в последнем опросе.Как нам этого избежать?