Кафка-одиночный потребитель для чтения из темы, имеющей несколько разделов с заданным смещением - PullRequest
1 голос
/ 20 июня 2019

У меня есть тема kafka с 2 разделами, я хочу создать потребителя для чтения темы с заданным смещением, ниже приведен пример кода, который я использую для чтения с заданным смещением, равным 9

Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition topicPartition: partitions) {
            consumer.seek(topicPartition, 9);
        }
    }
});

try {
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.topic() + ", " + record.partition() + ", " + record.offset() + ", " + record.value());
        }
    }
}catch(WakeupException ex){
    System.out.println("Exception caught " + ex.getMessage());
}finally{
    kafkaConsumer.close();
}

Но я вижу следующую ошибку

org.apache.kafka.clients.consumer.internals.Fetcher -  Fetch offset 9 is out of range for parition test-topic_partitions-0, resetting offset
org.apache.kafka.clients.consumer.internals.Fetcher -  Resetting offset for partition test-topic_partitions-0 to offset 

Я использую зависимость ниже maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

А также упомянутое значение delete.retention.ms, настроенное для этой темы, равно 86400000 (которое1 день) и retention.ms настроен как 172800000 (что составляет 2 дня)

Может кто-нибудь помочь, как устранить ошибку?

1 Ответ

1 голос
/ 20 июня 2019

Эта ошибка означает, что раздел не имеет записи со смещением 9. Так что либо:

  • в данный момент у вас меньше 9 записей в разделе
  • записей как минимум на9 были удалены политикой хранения

Вы можете использовать endOffsets() и beginningOffsets(), чтобы найти самые маленькие и самые большие смещения в вашем разделе,Если вы попытаетесь найти смещение вне этого диапазона, политика сброса auto.offset.reset сработает, чтобы найти действительное смещение.

...