Апач Кафка: поиск и назначение. Надежное чтение с самого начала - PullRequest
0 голосов
/ 31 октября 2018

Я пытаюсь реализовать базовый сценарий, перечитать тему с начала (хотя бы 1 сообщение), и я столкнулся с неожиданным поведением.

Предположим, что в 1 теме раздела содержится ровно 1 миллион сообщений, 1 потребитель со смещением уже зафиксирован где-то посередине, активных производителей нет.

Сначала я попробовал

  consumer.subscribe(Collections.singletonList(topic));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify

И это не работает (нет опрошенных сообщений). Я читал, что seekToBeginning ленив (и это нормально), но, оказывается, seekToBeginning никак не влияет, потому что ему нужно назначить разделы, что произойдет только при первом опросе. Должно ли это быть описано в документации, или я пропустил это?

Тогда я попробовал

  consumer.subscribe(Collections.singletonList(topic));
  consumer.poll(Duration.ofMillis(assignTimeout));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify

И получается, это зависит от assignTimeout. Этого должно быть достаточно для завершения процесса присоединения. Это время может меняться, и на него нельзя полагаться.

Тогда я предоставил ConsumerRebalanceListener с

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      consumer.seekToBeginning(partitions);
    }

и один poll осталось. И, наконец, это похоже на работу.

Итак, вопросы:

  1. Является ли seekToBeginning сразу после subscribe бесполезным? Должно ли это быть документировано?
  2. Надежно ли решение с ConsumerRebalanceListener? Гарантирует ли это, что никакие сообщения из среднего (зафиксированного смещения) не будут опрошены, прежде чем будет применен поиск?

1 Ответ

0 голосов
/ 31 октября 2018

Для первого :

Вы правильно упомянули это в своем вопросе, что предварительное условие для операций seek() или seekToXXXX() состоит в том, что разделы должны быть назначены. Этого не произойдет, пока мы не примем группу потребителей, и это произойдет, только если мы позвоним poll(). Таким образом, операция seek(), не работающая сразу после subscribe(), является ожидаемым поведением.

Это на самом деле задокументировано в Подробном руководстве Kafka, глава 4 «Потребители Kafka», раздел «Потребление записей с конкретными смещениями».

По второму вопросу :

Да, использование ConsumerRebalanceListener является надежным и рекомендуемым подходом в соответствии с Полным руководством Кафки.

Вот утверждение из той же главы, которое подтверждает то же самое:

Существует много разных способов реализации семантики, выполняемой ровно один раз. ..................., но все они будут необходимо использовать прослушиватель ConsumerRebalance и seek (), чтобы убедиться, смещения хранятся во времени и что потребитель начинает читать сообщения из правильного местоположения.

Надеюсь, это поможет!

...