Spark Streaming читает данные только из одной темы кафки - PullRequest
0 голосов
/ 14 января 2019

У меня есть приложение для потокового воспроизведения, мне нужно подписаться на несколько тем для чтения и обработки данных. Теперь для тестирования я добавил две темы. Но я следующий код читает данные только из одной темы. Я попытался отправить данные из пользовательского кода, а также через пользовательские пакетные файлы Kafka. Если я создаю несколько прямых потоков по одному для каждой темы, я могу читать все темы. Но у меня есть тысячи тем, поэтому я хочу создать только один прямой поток и предоставить набор тем. У кого-нибудь есть идея, что я могу делать неправильно?

Collection<String> topicList = new ArrayList<>();   
topicList.add("A");
topicList.add("B");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<
                    String, String> Subscribe(topicList, kafkaParams));

for(String topic:topicList)
{
    JavaDStream<ConsumerRecord<String, String>> filteredStreams = stream.filter(msg -> {
        return msg.topic().equals(topic);
    });

    JavaPairDStream<String, String> kafkaMessages = filteredStreams.mapToPair(new KafkaConsumerFunction());
    process(kafkaMessages );
}
...