Потребитель Kafka только потребляет сообщения во втором звонке - PullRequest
0 голосов
/ 13 декабря 2018

Я настраиваю потребителя kafka следующим образом:

private Consumer<String, HistoryEvent> createConsumer(Configuration configuration) {
    Properties baseProperties = getKafkaBaseProperties(configuration);
    baseProperties.remove("key.serializer");
    baseProperties.remove("value.serializer");
    baseProperties.setProperty("group.id", String.format("%s-group", applicationTopicName));

    return new KafkaConsumer<>(baseProperties);
}

private void setupConsumer() {
    topicConsumer.subscribe(Collections.singleton(applicationTopicName), consumerListener);
}

Затем я пытаюсь запросить полный журнал изменений темы у потребителя следующим образом:

topicConsumer.seekToBeginning(consumerListener.getActivePartitions());
List<HistoryEvent> events = new ArrayList<>();
ConsumerRecords<String, HistoryEvent> records = topicConsumer.poll(Duration.ofSeconds(1));
while (!records.isEmpty() || hasPendingMessages.apply(topicConsumer.endOffsets(consumerListener.getActivePartitions()).entrySet())) {
    events.addAll(StreamSupport.stream(records.spliterator(), false)
        .filter(record -> flowId.equals(record.key()))
        .map(ConsumerRecord::value)
        .collect(Collectors.toList())
    );

    records = topicConsumer.poll(Duration.ofSeconds(1));
}

InВ общем, этот код работает и возвращает все документы в теме, однако он делает это только после второго вызова.Как видно из кода, имеется прослушиватель разделов раздела, который получает уведомление о назначении или отзыве разделов у потребителя.Я добавил к этому отладчик и могу наблюдать следующее:

  • topicConsumer.subscribe -> Нет действий в слушателе раздела
  • Первый вызов , где яя пытаюсь получить все элементы -> cusomerListener.getActivePartitions() пусто, потому что слушатель для назначений темы никогда не вызывался с назначениями.Согласно исходному коду seekToBeginning это означает, что он будет искать все назначения.
  • topicConsumer.poll() вызывается.Во время этого вызова мой слушатель вызывается в onPartitionsRevoked с пустой коллекцией разделов (??)
  • Он не возвращает записей, а поскольку активных разделов нет, hasPendingMessages, который проверяет смещения разделов относительноend offsets также возвращает false -> никакие события не сохраняются вообще
  • Второй вызов для того же метода -> consumerListener.getActivePartitions() все еще пуст, поскольку до сих пор он вызывался только с пустымonPartitionsRevoked
  • topiConsumer.poll() называется.Во время этого вызова мой слушатель вызывается в onPartitionsAssigned с единственным разделом, существующим для темы
  • topicConsumer.poll(), возвращает все доступные записи
  • Последующие вызовы также возвращают все записи

Почему мне нужно проходить это дважды каждый раз, когда я перезапускаю свое приложение?Я хочу, чтобы все события в теме уже в начале ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...