Чтение записи последнего раздела Kafka, записанного в транзакции - PullRequest
1 голос
/ 07 марта 2019

Я пытаюсь прочитать последнюю запись раздела темы.Производитель темы пишет транзакционно.Потребитель настроен на isolation.level=read_committed.Я управляю смещениями вручную.Вот мой потребительский код:

// Fetch the end offset of a partition
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(topicPartition));
Long endOffset = endOffsets.get(topicPartition);

// Try to read the last record
if (endOffset > 0) {
    consumer.seek(topicPartition, Math.max(0, endOffset - 5));
    List records = consumer.poll(10 * 1000).records(topicPartition);

    if (records.isEmpty()) {
        throw new IllegalStateException("How can this be?");
    } else {
        // Return the last record
        return records.get(records.size() - 1);
    }
}

Итак, чтобы прочитать последнюю запись, я запрашиваю конечное смещение, а затем стремлюсь к endOffset - 5 (из-за того, что Кафка пропускает смещения в режиме ровно один раз, как я виделв этом вопросе: Kafka Streams не увеличивает смещение на 1 при создании в тему ) и начинает опрос.

Раньше это работало хорошо, но теперь у меня есть исключение, сообщающее мненулевые записи опрашиваются.Что может быть причиной для этого?Я не могу воспроизвести это, и я думаю, что я потерян.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...