У меня есть тема kafka с 2 разделами, я хочу создать потребителя для чтения темы с заданным смещением, ниже приведен пример кода, который я использую для чтения с заданным смещением, равным 9
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition topicPartition: partitions) {
consumer.seek(topicPartition, 9);
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + ", " + record.partition() + ", " + record.offset() + ", " + record.value());
}
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
}
Но я вижу следующую ошибку
org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset 9 is out of range for parition test-topic_partitions-0, resetting offset
org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test-topic_partitions-0 to offset
Я использую зависимость ниже maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
А также упомянутое значение delete.retention.ms
, настроенное для этой темы, равно 86400000 (которое1 день) и retention.ms
настроен как 172800000 (что составляет 2 дня)
Может кто-нибудь помочь, как устранить ошибку?