Как использовать сообщения из нескольких тем? - PullRequest
0 голосов
/ 17 июня 2019

Я пытаюсь использовать сообщения из нескольких тем, используя метод assign(). С моей реализацией, иногда я могу использовать сообщения из всех тем, а иногда только из одной темы. После некоторых исследований я обнаружил, что Kafka по умолчанию использует присваиватель Range. Следовательно, он не будет назначать все разделы всегда.

В моем случае я должен использовать все темы и разделы.

Я попытался, установив присваиватель RoundRobin. Но это не помогло

List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics) {
   topicPartitions.add(new TopicPartition(topic, 0);
}
kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`

Ответы [ 2 ]

1 голос
/ 17 июня 2019

KafkaConsumer.assign обычно используется для сложных случаев, когда вы хотите контролировать не только темы, но и разделы, которые вы используете. Если вы просто хотите использовать несколько тем (и все их разделы), вы должны использовать KafkaConsumer.subscribe.

consumer.subscribe(Arrays.asList("topic1", "topic2"));

Посмотрите на javadoc javadoc , который также показывает примеры кода.

РЕДАКТИРОВАТЬ: если вам нужно управлять назначением разделов, то вам действительно нужно использовать метод assign (), но в вашем (неполном) примере кода это выглядит так, как будто вы назначаете раздел 0 каждой темы; поэтому вы будете принимать сообщения только из раздела 0.

Если вам нужно управлять смещением вручную, вы все равно можете использовать подписку, но вы можете отключить автоматическую фиксацию и использовать seek () и commitSync () или commitAsync () для управления смещением.

0 голосов
/ 17 июня 2019

Следующие действия помогут вам:

consumer.subscribe(Arrays.asList("mytopic1","mytopic2"), ConsumerRebalanceListener obj)
// or consumer.subscribe(Arrays.asList(topic1,topic2), new ConsumerRebalanceListener ..)

ConsumerRecords<String, String> records = consumer.poll(600);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + " -> " + record.value());
    }
}
...