Итерации по результатам до условия с Project Reactor в Java - PullRequest
1 голос
/ 16 апреля 2020

Мне нужно перебирать результаты из хранилища данных, пока не обнаружу, что ни один результат не соответствует условиям, которые у меня есть, используя Flux и / или Mono из Project Reactor. У меня есть решение, но оно использует ошибки для контроля потока. Я действительно чувствую, что должно быть более чистое решение, но если оно есть, я не могу его найти. Есть ли лучший способ достичь того, что я хочу, чем это:

import reactor.core.publisher.Mono;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;

class Scratch {
    public static void main(String[] args) {
        Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
                .map(count -> "FooBar " + count)
                .flatMap(string -> searchExisting(string)
                        .flatMap(exists -> Mono.<String>error(KeyExistsException::new))
                        .switchIfEmpty(Mono.just(string)))
                .retry(t -> t instanceof KeyExistsException);

        System.out.println("Result: " + mono.block());
    }

    private static Mono<String> searchExisting(String test) {
        System.out.println("In searchExisting: " + test);
        return Mono.fromCompletionStage(getCompletableFuture(test));
    }

    private static final List<String> available = asList(
            "FooBar 1", "FooBar 2", "FooBar 3", "FooBar 4", "FooBar 5"
    );

    private static CompletableFuture<String> getCompletableFuture(String test) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("In CompletableFuture: " + test);

            if (available.contains(test)) {
                return test;
            }

            System.out.println("Found no entry");
            return null;
        });
    }
}

class KeyExistsException extends RuntimeException {

}

Я пытался использовать Flux.fromStream(Stream.iterate(1, x -> x + 1)) и идти оттуда, но в этом случае мне все еще нужен контроль потока на основе ошибок и я не могу контролировать скорость, с которой Flux испускает данные, что означает, что я получаю много обращений к хранилищу данных, когда ищу только несколько, вплоть до того, как найду пропущенное значение.

Это похоже на достаточно распространенный вариант использования. Чего мне не хватает?

--- EDIT ---

По предложению Бартоша, альтернативная реализация будет выглядеть так:

Mono<String> mono = Flux.fromStream(Stream.iterate(1, x -> x + 1))
                .map(count -> "FooBar " + count)
                .flatMap(newKey -> Mono.zip(Mono.just(newKey), searchExisting(newKey)))
                .takeUntil(tuple -> !tuple.getT2().isPresent())
                .map(Tuple2::getT1)
                .last();

, то есть lot приятнее, за исключением того, что он не дает никакого контроля над тем, сколько событий генерирует Flux - вы получаете lot попыток найти правильное значение, гораздо больше, чем вы нужно, потому что takeUntil не делает Flux ленивым ...

1 Ответ

1 голос
/ 16 апреля 2020

Если вы открыты для использования Optional s, вы можете объединить обе ваши попытки и сделать что-то вроде

    Mono<String> mono =  Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
            .map(count -> "FooBar " + count)
            .flatMap(newKey -> Mono.zip(Mono.just(newKey), 
                                           searchExisting(newKey)
                                               .map(Optional::ofNullable)
                                               .defaultIfEmpty(Optional.empty())))
            .repeat()
            .takeUntil(tuple -> tuple.getT2().isEmpty())
            .map(Tuple2::getT1)
            .last();

    System.out.println("Result: " + mono.block());

Основная проблема с тем, что вы пытаетесь сделать, это то, что пустые Mono s игнорируется почти всем индикатором switchIfEmpty(), означающим, что ни один из операторов, таких как takeUntil, даже не увидит их.

Обычно это то, что вы хотели бы, но в вашем случае вы хотите действовать на пустой моно. Оборачивая его Optional, он все еще может быть обработан.

...