Spring Flux (или Java Stream) объединены в левое соединение - PullRequest
0 голосов
/ 08 февраля 2019

Как я могу объединить эти два потока

val a: Flux<String> = Flux.just("foo", "bar", "baz", "foobar")
val b: Flux<Pair<String, Int>> = Flux.just( Pair("foo", 5), Pair("baz", 5))

, чтобы получить еще один поток как этот?

val c: Flux<Pair<String, Int>> = Flux.just(
    Pair("foo", 5),
    Pair("bar", 0),
    Pair("baz", 5),
    Pair("foobar",0)
)

Проще говоря, мне нужен весь элемент А, сзначения B, если существует, ноль / ноль в противном случае, как sql left join

Я смотрю на zip, zipWith, merge, но я немного запутался.Любой намек?Спасибо

Ответы [ 2 ]

0 голосов
/ 09 февраля 2019
     Flux.from(a)
                .map(s -> {
                    return b.filter(strPair -> s.equals(strPair.fst)).switchIfEmpty(Flux.just(Pair.of(s,0)));
                }).flatMap(Flux::next).log().subscribe();

вывод:

10:26:00.358 [main] INFO reactor.Flux.FlatMap.3 - request(unbounded)
10:26:00.591 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[foo,5])
10:26:00.593 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[bar,0])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[baz,5])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onNext(Pair[foobar,0])
10:26:00.594 [main] INFO reactor.Flux.FlatMap.3 - onComplete()

0 голосов
/ 08 февраля 2019

Не очень знаком с Kotlin, но, поскольку вы также запросили какое-либо потоковое решение, вы можете нарисовать сходство со следующим примером:

List<String> list = List.of("foo", "bar", "baz", "foobar");
List<Pair<String, Integer>> pairs = List.of(Pair.of("foo", 5), Pair.of("baz", 5));
List<Pair<String, Integer>> result = list.stream()
    .map(a ->
        pairs.stream()
            .filter(p -> p.getLeft().equals(a))
            .findFirst()
            .orElse(Pair.of(a, 0)))
    .collect(Collectors.toList());
...