Почему Kafka searchToBeginning и seekToEnd не работают с assign? - PullRequest
5 голосов
/ 25 октября 2019

Скажем, я хочу проверить смещение первого и последнего сообщения в Кафке для определенного раздела. Моя идея состояла в том, чтобы использовать метод assign(…) вместе с seekToBeginning(…) и seekToEnd(…). К сожалению, это не работает.

Если я установлю AUTO_OFFSET_RESET_CONFIG на "latest", seekToBeginning(…) не будет иметь никакого эффекта;если я установлю "earliest", seekToEnd(…) не будет работать. Кажется, единственное, что имеет значение для моего потребителя, это AUTO_OFFSET_RESET_CONFIG.

Я видел похожую тему, но проблема касалась subscribe(), а не метода assign(). Предложенное решение состояло в том, чтобы реализовать ConsumerRebalanceListner и передать его в качестве параметра методу subscribe(). К сожалению, метод assign() имеет только одну подпись и может принимать только список тематических разделов.

Вопрос в том, можно ли использовать seekToBeginning() или seekToEnd() с методом assign(). Если да, то как? Если нет, то почему?

Соответствующий фрагмент моего кода:

KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);

consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...

Регистратор печатает смещение n, которое является наибольшим (самым последним) смещением рассматриваемой темы.

1 Ответ

0 голосов
/ 28 октября 2019

Скажем, я хочу проверить смещение первого и последнего сообщения в Кафке для определенного раздела

Вы можете использовать beginningOffsets и endOffsets для этого.

Вопрос: возможно ли использовать seekToBeginning() или seekToEnd() с assign ()

Вы должны вызвать poll() после seekToBeginning или seekToEnd:

Эта функция выполняет ленивый поиск, пытаясь найти первое смещение во всехразделы только тогда, когда опрос (Duration) или позиция (TopicPartition) называется

...