KafkaConsumer: использовать все доступные сообщения один раз и выйти - PullRequest
0 голосов
/ 25 сентября 2018

Я хочу создать KafkaConsumer с Kafka 2.0.0, который будет использовать все доступные сообщения один раз и немедленно завершить работу.Это немного отличается от стандартной консольной утилиты-потребителя, потому что эта утилита ожидает указанного времени ожидания для новых сообщений и завершает работу только по истечении этого времени ожидания.

Эта, казалось бы, простая задача кажется удивительнойтрудно использовать KafkaConsumer.Моя внутренняя реакция была следующим псевдокодом:

consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
do
  result = consumer.poll(Duration.ofMillis(0))
  // onResult(result)
while result is not empty

Однако это не работает, так как poll всегда возвращает пустую коллекцию, даже если в теме много сообщений.

Изучая это, похоже, что одной из причин могло быть то, что назначение / подписка считалось ленивым , а разделы не назначались до тех пор, пока не завершился цикл poll (хотя я не могу найти никакой поддержки для этого утверждения вдокументы).Однако следующий псевдокод также возвращает пустую коллекцию при каждом вызове poll:

consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// deprecated poll also returns nothing
result = consumer.poll(0)
// returns nothing
result = consumer.poll(0)
// returns nothing
result = consumer.poll(0)
...

Так что ясно, что «лень» не является проблемой.

Javadoc утверждает:

Этот метод немедленно возвращается, если есть доступные записи.

, что, по-видимому, подразумевает, что первый псевдокод выше должен работать.Однако это не так.

Единственное, что, кажется, работает, это указать ненулевое время ожидания для poll, а не просто любое ненулевое значение, например, 1 не работает,Это указывает на то, что внутри poll происходит некоторое недетерминированное поведение, которое предполагает, что poll всегда будет выполняться в бесконечном цикле, и не имеет значения, что оно иногда возвращает пустую коллекцию, несмотря на наличие сообщений.Код , кажется, подтверждает это различными вызовами, чтобы проверить, истек ли тайм-аут, разбросанный по всей реализации poll и его вызываемым абонентам.

Так что при наивном подходе более длительное время ожидания очевиднотребуется (и в идеале Long.MAX_VALUE, чтобы избежать недетерминированного поведения более короткого интервала опроса), но, к сожалению, это заставит потребителя заблокировать последний опрос, что нежелательно в этой ситуации.При наивном подходе у нас теперь есть компромисс между тем, насколько детерминированным мы хотим, чтобы поведение было, и тем, как долго мы должны ждать без причины в последнем опросе.Как нам этого избежать?

Ответы [ 2 ]

0 голосов
/ 25 сентября 2018

Если никто не производит одновременно, вы также можете использовать endOffsets, чтобы получить позицию последнего сообщения и потреблять до этого.

Итак, в псевдокоде:

long currentOffset = -1
long endOffset = consumer.endOffset(partition)
while (currentOffset < endOffset) {
  records = consumer.poll(NONTRIVIAL_TIMEOUT) // discussed in your answer
  currentOffset = records.offsets().max()
}

Thisкак избежать окончательного ненулевого зависания, так как мы всегда уверены, что есть, что получить.

Возможно, вам придется добавить защитные меры, если позиция вашего потребителя равна конечному смещению (так как вы не получили бы там сообщений).

Также, возможно, вы захотите установить max.poll.records в 1, чтобы не использовать сообщения, расположенные после end offset, если кто-то производит параллельно.

0 голосов
/ 25 сентября 2018

Кажется, что единственный способ сделать это - использовать дополнительную логику, которая самостоятельно управляет смещениями.Вот псевдокод:

consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
// record the current ending offsets and poll until we get there
endOffsets = consumer.endOffsets(all partitions)

do
  result = consumer.poll(NONTRIVIAL_TIMEOUT)
  // onResult(result)
while given any partition p, consumer.position(p) < endOffsets[p]

и реализация в Kotlin:

val topicPartitions = consumer.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
consumer.assign(topicPartitions)
consumer.seekToBeginning(consumer.assignment())

val endOffsets = consumer.endOffsets(consumer.assignment())
fun pendingMessages() = endOffsets.any { consumer.position(it.key) < it.value }

do {
  records = consumer.poll(Duration.ofMillis(1000))
  onResult(records)
} while(pendingMessages())

Теперь продолжительность опроса может быть установлена ​​на разумное значение (например, 1 с), не заботясь опропущенные сообщения, поскольку цикл продолжается до тех пор, пока потребитель не достигнет конечных смещений, определенных в начале цикла.

Существует еще один угловой случай, с которым он справляется правильно: если конечные смещения изменились, но фактически естьнет сообщений между текущим смещением и конечным смещением, тогда опрос будет заблокирован и время ожидания.Поэтому важно, чтобы время ожидания не было установлено слишком низким (в противном случае потребитель получит время ожидания до получения сообщений, которые доступны ), и оно также не должно быть установлено слишком высоким (в противном случае потребителю потребуется слишком много времени для ожиданияпри получении сообщений, которые недоступны ).Последняя ситуация может возникнуть, если эти сообщения были удалены или тема была удалена и воссоздана.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...