Я хотел бы создать Flux, который передает новое состояние следующей операции в соответствии с будущим результатом, который он получает в текущем процессе.
Я уже реализовал sth следующим образом:
public static Flux<MyResult> create(MyHttpClient myClient) {
final MyGenerator generator = new MyGenerator(myClient);
return Flux.generate(Optional::empty, generator);
}
private final static class MyGenerator implements BiFunction<Optional<String>, SynchronousSink<MyResult>, Optional<String>> {
private final MyHttpClient myHttpClient;
private MyGenerator(MyHttpClient myHttpClient) {
this.myHttpClient = myHttpClient;
}
@Override
public Optional<String> apply(Optional<String> lastState, SynchronousSink<Object> objectSynchronousSink) {
MyResult myResult = lastState.map((ls) -> myHttpClient.blockingRequestWithState(ls)).orElseGet(() -> myHttpClient.blockingRequestForInitial());
if (myResult.isFinished()) {
sink.complete();
return Optional.empty();
}
sink.next(myResult);
return Optional.of(myResult.getLastState());
}
}
Однако я хотел бы избежать блокировки потока, вызвав методы 'myHttpClient.blockingRequest ...', сделать запрос асинхронно и передать это будущее значение как lastState.
Есть ли у вас какие-либо предложения для достижения этой цели?