Как сохранить элементы отфильтрованного потока - PullRequest
2 голосов
/ 16 октября 2019

Я использую реактор с активной зоной: 3.3.0.RELEASE и хочу сохранить некоторые элементы Flux, отфильтрованные по предикату.

Я только что придумал следующий код, однако я знаю,это неэффективно и выглядит неловко, потому что он применяет Predicate дважды.

Queue<Integer> queue = Queues.<Integer> small().get();
Predicate<Integer> evenNumberPredicate = i -> i % 2 == 0;

Flux.range(1, 50)
    .map(i -> {
        if (!evenNumberPredicate.test(i)) {
            queue.offer(i);  // odd numbers are queued: want to use them later
        }
        return i;
    })
    .filter(evenNumberPredicate)
    .subscribe(System.out::println); // even numbers are printed

Я хотел бы написать что-то вроде этого;

Flux.range(1, 50)
    .filterOr(evenNumberPredicate, i -> queue.offer(i))
    .subscribe(System.out::println);

Есть идеи? спасибо.

Ответы [ 2 ]

1 голос
/ 16 октября 2019

Вы можете сделать это с оператором groupBy.

Flux.range(1, 50)
        .groupBy(i -> i % 2)
        .flatMap(groupedFlux -> groupedFlux.key() == 0 ?
                groupedFlux :
                groupedFlux.doOnNext(queue::offer).ignoreElements())
        .subscribe(System.out::println);
0 голосов
/ 16 октября 2019

Я думаю, что в этом случае я бы просто использовал один тройной flatMap() вызов:

Flux.range(1, 50)
        .flatMap(i -> i%2==0 ?
                Mono.just(i) :
                Mono.just(i).doOnNext(queue::offer).ignoreElement())
        .subscribe(System.out::println);

В любом случае, ignoreElement() внутри flatMap() вызова - ваш друг здесь.

...