Я пытаюсь прочитать последнюю запись раздела темы.Производитель темы пишет транзакционно.Потребитель настроен на isolation.level=read_committed
.Я управляю смещениями вручную.Вот мой потребительский код:
// Fetch the end offset of a partition
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(topicPartition));
Long endOffset = endOffsets.get(topicPartition);
// Try to read the last record
if (endOffset > 0) {
consumer.seek(topicPartition, Math.max(0, endOffset - 5));
List records = consumer.poll(10 * 1000).records(topicPartition);
if (records.isEmpty()) {
throw new IllegalStateException("How can this be?");
} else {
// Return the last record
return records.get(records.size() - 1);
}
}
Итак, чтобы прочитать последнюю запись, я запрашиваю конечное смещение, а затем стремлюсь к endOffset - 5
(из-за того, что Кафка пропускает смещения в режиме ровно один раз, как я виделв этом вопросе: Kafka Streams не увеличивает смещение на 1 при создании в тему ) и начинает опрос.
Раньше это работало хорошо, но теперь у меня есть исключение, сообщающее мненулевые записи опрашиваются.Что может быть причиной для этого?Я не могу воспроизвести это, и я думаю, что я потерян.