Работа на пересечении с Flux - Project Reactor - PullRequest
3 голосов
/ 27 мая 2020

Допустим, у меня есть и var a = Flux.just("A", "B", "C"), и var b = Flux.just("B", "C", "D")

Я хочу иметь возможность пересекать обе переменные, и результат должен быть эквивалентен пересечению множества

Что-то вроде a.intersect(b) или Flux.intersect(a, b), что приведет к (Flux of) ["B", "C"]

Я не смог найти ни одной операции, которая выполняет это, какие-либо идеи?

Ответы [ 3 ]

0 голосов
/ 28 мая 2020

Вы можете использовать join, filter, map и group Таким образом метод

public <T> Flux<T> intersect(Flux<T> f1,Flux<T> f2){
    return f1.join(f2,f ->Flux.never(),f-> Flux.never(),Tuples::of)
            .filter(t -> t.getT1().equals(t.getT2()))
            .map(Tuple2::getT1)
            .groupBy(f -> f)
            .map(GroupedFlux::key);
}

//Use on it's own
intersect(a,b).subscribe(System.out::println)

//Or with existing flux
a.transform(f -> intersect(a,f)).subscribe(System.out::println)
0 голосов
/ 28 мая 2020

Мне нравится эффективность, поэтому я предпочитаю использовать то, что доказано, без чрезмерной зависимости от потоковых (или потоковых) операций.

Недостатком этого является необходимость собрать один из потоков в отсортированный список. Возможно, вы заранее узнаете, короче ли один Flux. Однако мне кажется, что им придется сделать это, несмотря ни на что, поскольку вам нужно сравнивать каждый элемент потока A со всеми элементами потока B (или, по крайней мере, пока вы не найдете совпадение).

Итак, соберите Flux A в отсортированный список, и тогда нет причин не использовать Collections::binarySearch для вашего собранного / отсортированного потока.

    a.collectSortedList()
    .flatMapMany(sorteda -> b.filter(be->Collections.binarySearch(sorteda, be)>=0))
    .subscribe(System.out::println);
0 голосов
/ 27 мая 2020

Мой предпочтительный подход будет примерно таким:

Flux.merge(a, b)
        .groupBy(Function.identity())
        .filterWhen(g -> g.count().map(l -> l>1))
        .map(g -> g.key())
        .subscribe(System.out::print); //Prints "BC"

(Если a или b могут содержать дубликаты, замените первую строку на Flux.merge(a.distinct(), b.distinct()).)

Каждый В publisher играют только один раз, и при необходимости легко расширить его до более чем двух издателей.

...