У меня есть приложение для потокового воспроизведения, мне нужно подписаться на несколько тем для чтения и обработки данных. Теперь для тестирования я добавил две темы.
Но я следующий код читает данные только из одной темы. Я попытался отправить данные из пользовательского кода, а также через пользовательские пакетные файлы Kafka. Если я создаю несколько прямых потоков по одному для каждой темы, я могу читать все темы. Но у меня есть тысячи тем, поэтому я хочу создать только один прямой поток и предоставить набор тем. У кого-нибудь есть идея, что я могу делать неправильно?
Collection<String> topicList = new ArrayList<>();
topicList.add("A");
topicList.add("B");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<
String, String> Subscribe(topicList, kafkaParams));
for(String topic:topicList)
{
JavaDStream<ConsumerRecord<String, String>> filteredStreams = stream.filter(msg -> {
return msg.topic().equals(topic);
});
JavaPairDStream<String, String> kafkaMessages = filteredStreams.mapToPair(new KafkaConsumerFunction());
process(kafkaMessages );
}