KStream отправляет записи в несколько потоков (не в Branch) - PullRequest
0 голосов
/ 06 января 2019

Есть ли способ сделать операцию, подобную ветви, но поместить запись в каждый выходной поток, предикат которого оценивается как true? Brach помещает запись в первое совпадение (документация: запись помещается в один и только один выходной поток при первом совпадении).

Ответы [ 2 ]

0 голосов
/ 06 января 2019

Вы можете «транслировать» и фильтровать каждый поток индивидуально:

KStream stream = ...

stream1 = stream.filter(...);
stream2 = stream.filter(...);
// and so on...

Если вы используете переменную stream несколько раз, все записи передаются всем нисходящим фильтрам (или любому другому оператору), т. Е. Каждый фильтр выполняется для каждой записи.

0 голосов
/ 06 января 2019

Я думаю, вы можете использовать что-то вроде этого:

KStream<String, String> inputStream = builder.stream("input");
List<Predicate<String, String>> predicates = new ArrayList<>(); // <-- list of predicates
List<KStream<String, String>> kStreams = predicates.stream()
        .map(inputStream::branch)
        .map(Arrays::asList)
        .map(listOfOneElementKStreams -> listOfOneElementKStreams.get(0)).collect(Collectors.toList());
...