Я работал над кодированием потоков Кафки. Попытка сначала создать несколько примеров для понимания.
Я попытался подписаться на одну тему с кодом ниже Consumer
, и он работает нормально.
try {
while (true) {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// To Retrieve Data in this Consumer
System.out.println("Received message: " + record.value() );
}
}
} finally {
consumer.close();
}
Когда я хочу использовать несколько тем, я попробовал следующий пример:
try {
while (true) {
consumer.subscribe(Arrays.asList(topic, topic1));
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// To Retrieve Data in this Consumer
System.out.println("Received message: " + record.value() );
}
}
} finally {
consumer.close();
}
Но это получение записи только из одной темы, а не из обеих тем. Может кто-нибудь подсказать мне, как я могу использовать записи из более чем одной темы?