Мне нужно перебирать результаты из хранилища данных, пока не обнаружу, что ни один результат не соответствует условиям, которые у меня есть, используя Flux и / или Mono из Project Reactor. У меня есть решение, но оно использует ошибки для контроля потока. Я действительно чувствую, что должно быть более чистое решение, но если оно есть, я не могу его найти. Есть ли лучший способ достичь того, что я хочу, чем это:
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Arrays.asList;
class Scratch {
public static void main(String[] args) {
Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
.map(count -> "FooBar " + count)
.flatMap(string -> searchExisting(string)
.flatMap(exists -> Mono.<String>error(KeyExistsException::new))
.switchIfEmpty(Mono.just(string)))
.retry(t -> t instanceof KeyExistsException);
System.out.println("Result: " + mono.block());
}
private static Mono<String> searchExisting(String test) {
System.out.println("In searchExisting: " + test);
return Mono.fromCompletionStage(getCompletableFuture(test));
}
private static final List<String> available = asList(
"FooBar 1", "FooBar 2", "FooBar 3", "FooBar 4", "FooBar 5"
);
private static CompletableFuture<String> getCompletableFuture(String test) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("In CompletableFuture: " + test);
if (available.contains(test)) {
return test;
}
System.out.println("Found no entry");
return null;
});
}
}
class KeyExistsException extends RuntimeException {
}
Я пытался использовать Flux.fromStream(Stream.iterate(1, x -> x + 1))
и идти оттуда, но в этом случае мне все еще нужен контроль потока на основе ошибок и я не могу контролировать скорость, с которой Flux
испускает данные, что означает, что я получаю много обращений к хранилищу данных, когда ищу только несколько, вплоть до того, как найду пропущенное значение.
Это похоже на достаточно распространенный вариант использования. Чего мне не хватает?
--- EDIT ---
По предложению Бартоша, альтернативная реализация будет выглядеть так:
Mono<String> mono = Flux.fromStream(Stream.iterate(1, x -> x + 1))
.map(count -> "FooBar " + count)
.flatMap(newKey -> Mono.zip(Mono.just(newKey), searchExisting(newKey)))
.takeUntil(tuple -> !tuple.getT2().isPresent())
.map(Tuple2::getT1)
.last();
, то есть lot приятнее, за исключением того, что он не дает никакого контроля над тем, сколько событий генерирует Flux
- вы получаете lot попыток найти правильное значение, гораздо больше, чем вы нужно, потому что takeUntil
не делает Flux
ленивым ...