Как программно получить последнее смещение топики Кафки c в Java - PullRequest
1 голос
/ 21 апреля 2020

Вот то, что я пытаюсь

Collection <TopicPartition> partitions = consumer.partitionsFor(topic).stream();

А также как указать, что вы достигли конца или больше нет сообщений для потребления. Если смещение не соответствует смещению конца брокера в то время, как это сделать.

Любые предложения.

1 Ответ

0 голосов
/ 21 апреля 2020

Чтобы получить последнее смещение, вы можете использовать командную строку:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list localhost:9092 \
    --topic topicName

или программно в Java:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    consumer.subscribe(Arrays.asList("topicName"));
        Set<TopicPartition> assignment;
        while ((assignment = consumer.assignment()).isEmpty()) {
            consumer.poll(Duration.ofMillis(500));
        }
        consumer.endOffsets(assignment).forEach((partition, offset) -> System.out.println(partition + ": " + offset));
}

Теперь, если вы хотите чтобы заставить потребителя начать потреблять с последнего смещения, вы можете использовать следующее свойство:

props.put("auto.offset.reset", "latest"); 
// or props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

или заставить его потреблять с последнего смещения, используя

consumer.seekToEnd();

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...