Я настраиваю потребителя kafka следующим образом:
private Consumer<String, HistoryEvent> createConsumer(Configuration configuration) {
Properties baseProperties = getKafkaBaseProperties(configuration);
baseProperties.remove("key.serializer");
baseProperties.remove("value.serializer");
baseProperties.setProperty("group.id", String.format("%s-group", applicationTopicName));
return new KafkaConsumer<>(baseProperties);
}
private void setupConsumer() {
topicConsumer.subscribe(Collections.singleton(applicationTopicName), consumerListener);
}
Затем я пытаюсь запросить полный журнал изменений темы у потребителя следующим образом:
topicConsumer.seekToBeginning(consumerListener.getActivePartitions());
List<HistoryEvent> events = new ArrayList<>();
ConsumerRecords<String, HistoryEvent> records = topicConsumer.poll(Duration.ofSeconds(1));
while (!records.isEmpty() || hasPendingMessages.apply(topicConsumer.endOffsets(consumerListener.getActivePartitions()).entrySet())) {
events.addAll(StreamSupport.stream(records.spliterator(), false)
.filter(record -> flowId.equals(record.key()))
.map(ConsumerRecord::value)
.collect(Collectors.toList())
);
records = topicConsumer.poll(Duration.ofSeconds(1));
}
InВ общем, этот код работает и возвращает все документы в теме, однако он делает это только после второго вызова.Как видно из кода, имеется прослушиватель разделов раздела, который получает уведомление о назначении или отзыве разделов у потребителя.Я добавил к этому отладчик и могу наблюдать следующее:
topicConsumer.subscribe
-> Нет действий в слушателе раздела - Первый вызов , где яя пытаюсь получить все элементы ->
cusomerListener.getActivePartitions()
пусто, потому что слушатель для назначений темы никогда не вызывался с назначениями.Согласно исходному коду seekToBeginning
это означает, что он будет искать все назначения. topicConsumer.poll()
вызывается.Во время этого вызова мой слушатель вызывается в onPartitionsRevoked
с пустой коллекцией разделов (??) - Он не возвращает записей, а поскольку активных разделов нет,
hasPendingMessages
, который проверяет смещения разделов относительноend offsets также возвращает false -> никакие события не сохраняются вообще - Второй вызов для того же метода ->
consumerListener.getActivePartitions()
все еще пуст, поскольку до сих пор он вызывался только с пустымonPartitionsRevoked
topiConsumer.poll()
называется.Во время этого вызова мой слушатель вызывается в onPartitionsAssigned
с единственным разделом, существующим для темы topicConsumer.poll()
, возвращает все доступные записи - Последующие вызовы также возвращают все записи
Почему мне нужно проходить это дважды каждый раз, когда я перезапускаю свое приложение?Я хочу, чтобы все события в теме уже в начале ...