Почему все разделы не привязаны к одному запущенному KafkaConsumer? - PullRequest
1 голос
/ 19 сентября 2019

Я пытаюсь прочитать последние 3 записи из темы "input_topic".Я использую только одного потребителя.Но это потребляет запись только из одного раздела.

Когда я вручную назначил другие разделы, появляется ошибка «Вы можете проверить положение только для разделов, назначенных этому потребителю».Но я использую одного потребителя.Я не могу понять проблему.Пожалуйста, помогите мне, если это возможно.

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,"4");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "input_topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
TopicPartition topicPartition1 = new TopicPartition(topic, 1);    
TopicPartition topicPartition2 = new TopicPartition(topic, 2);
List<TopicPartition> topics = Arrays.asList(topicPartition1,topicPartition,topicPartition2);
while (true) {
  Thread.sleep(5000);
  consumer.assign(topics);
  consumer.seekToEnd(topics);

  long current = consumer.position(topicPartition);
  consumer.seek(topicPartition, current-3);
  ConsumerRecords<String, String> records = consumer.poll(100);
  System.out.println("-------------------------------------------> "+ records.count());
  System.out.println("-------------------------------------------> "+ LocalDateTime.now());
  for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
    System.out.println("_________________________" + record.partition());
  }
}

1 Ответ

1 голос
/ 20 сентября 2019

Полагаю ... кроме того, что Хатидже говорила о assign, который должен быть сделан вне цикла только один раз, я вижу это из вашего кода.Вы ищите позицию в конце всех разделов темы, но затем вы ищите смещение для последних 3 записей только для раздела 0. В этот момент опрос может использовать только эти 3 записи из раздела 0 темы ине из других разделов, потому что ваша позиция на них заканчивается (конечно, это правда, если вы не отправляете больше сообщений в эти разделы).

...