Чтобы получить последнее смещение, вы можете использовать командную строку:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic topicName
или программно в Java:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Arrays.asList("topicName"));
Set<TopicPartition> assignment;
while ((assignment = consumer.assignment()).isEmpty()) {
consumer.poll(Duration.ofMillis(500));
}
consumer.endOffsets(assignment).forEach((partition, offset) -> System.out.println(partition + ": " + offset));
}
Теперь, если вы хотите чтобы заставить потребителя начать потреблять с последнего смещения, вы можете использовать следующее свойство:
props.put("auto.offset.reset", "latest");
// or props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
или заставить его потреблять с последнего смещения, используя
consumer.seekToEnd();