У меня есть Kafka Consumer:
public void consumeKafka(Collection<String> topics) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
int i = 0;
int iterationValue = 5;
while (i++ < iterationValue) {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
System.out.println(records.partitions());
}
}
Мне нужно прочитать по одной записи из каждой темы.Как я могу это сделать?Когда я указываю значение props.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1), я ничего не получаю.
PS с большим пулом, у меня разные результаты в консоли на каждой итерации.Данные присутствуют.
Пример результата System.out.println (records.partitions ());:
1 []
2 [TSDocumentExpress-0, TSDocMSTask-0]
3 [TSDocRouteSheet-0, TSDocTMSTask-0]
4 [TSDoctTransferStatus-0, TSDocRouteSheet-0]
5 [TSDocTransferStatus-0]