Как прочитать одну запись из каждой темы Кафки? - PullRequest
0 голосов
/ 22 марта 2019

У меня есть Kafka Consumer:

public void consumeKafka(Collection<String> topics) {

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
    consumer = new KafkaConsumer<>(props);

    consumer.subscribe(topics);

    int i = 0;
    int iterationValue = 5;

        while (i++ < iterationValue) {
            ConsumerRecords<Object, Object> records = consumer.poll(1000);
            System.out.println(records.partitions());
        }
}

Мне нужно прочитать по одной записи из каждой темы.Как я могу это сделать?Когда я указываю значение props.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1), я ничего не получаю.

PS с большим пулом, у меня разные результаты в консоли на каждой итерации.Данные присутствуют.

Пример результата System.out.println (records.partitions ());:

1 [] 
2 [TSDocumentExpress-0, TSDocMSTask-0] 
3 [TSDocRouteSheet-0, TSDocTMSTask-0] 
4 [TSDoctTransferStatus-0, TSDocRouteSheet-0] 
5 [TSDocTransferStatus-0]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...