Скажем, я хочу проверить смещение первого и последнего сообщения в Кафке для определенного раздела. Моя идея состояла в том, чтобы использовать метод assign(…)
вместе с seekToBeginning(…)
и seekToEnd(…)
. К сожалению, это не работает.
Если я установлю AUTO_OFFSET_RESET_CONFIG
на "latest"
, seekToBeginning(…)
не будет иметь никакого эффекта;если я установлю "earliest"
, seekToEnd(…)
не будет работать. Кажется, единственное, что имеет значение для моего потребителя, это AUTO_OFFSET_RESET_CONFIG
.
Я видел похожую тему, но проблема касалась subscribe()
, а не метода assign()
. Предложенное решение состояло в том, чтобы реализовать ConsumerRebalanceListner
и передать его в качестве параметра методу subscribe()
. К сожалению, метод assign()
имеет только одну подпись и может принимать только список тематических разделов.
Вопрос в том, можно ли использовать seekToBeginning()
или seekToEnd()
с методом assign()
. Если да, то как? Если нет, то почему?
Соответствующий фрагмент моего кода:
KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);
consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...
Регистратор печатает смещение n, которое является наибольшим (самым последним) смещением рассматриваемой темы.