Я пытаюсь прочитать последние 3 записи из темы "input_topic".Я использую только одного потребителя.Но это потребляет запись только из одного раздела.
Когда я вручную назначил другие разделы, появляется ошибка «Вы можете проверить положение только для разделов, назначенных этому потребителю».Но я использую одного потребителя.Я не могу понять проблему.Пожалуйста, помогите мне, если это возможно.
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,"4");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "input_topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
TopicPartition topicPartition2 = new TopicPartition(topic, 2);
List<TopicPartition> topics = Arrays.asList(topicPartition1,topicPartition,topicPartition2);
while (true) {
Thread.sleep(5000);
consumer.assign(topics);
consumer.seekToEnd(topics);
long current = consumer.position(topicPartition);
consumer.seek(topicPartition, current-3);
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("-------------------------------------------> "+ records.count());
System.out.println("-------------------------------------------> "+ LocalDateTime.now());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println("_________________________" + record.partition());
}
}