Как очень неоптимизированное подтверждение концепции, это может быть достигнуто следующим образом:
Давайте создадим исполнителя, который сможет выполнять задачи «по требованию» контролируемым образом.
private static class SelfEventLoopExecutor implements Executor {
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override
public void execute(Runnable command) {
boolean added = queue.add(command);
assert added;
}
public void drainQueue() {
Runnable r;
while ((r = queue.poll()) != null) {
r.run();
}
}
}
Затем создайте подписчика, который сможет использовать исполнителя для выполнения задач в ожидании результата, а не для полной блокировки потока.
public static class LazyBlockingSubscriber<T> implements Subscriber<T> {
private final SelfEventLoopExecutor selfExec;
private volatile boolean completed = false;
private volatile T value;
private volatile Throwable ex;
public LazyBlockingSubscriber(SelfEventLoopExecutor selfExec) {
this.selfExec = selfExec;
}
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(T t) {
value = t;
completed = true;
}
@Override
public void onError(Throwable t) {
ex = t;
completed = true;
}
@Override
public void onComplete() {
completed = true;
}
public T block() throws Throwable {
while (!completed) {
selfExec.drainQueue();
}
if (ex != null) {
throw ex;
}
return value;
}
}
Теперь мы можем изменить код следующим образом
public static void main(String[] args) throws Throwable {
var future = new CompletableFuture<String>();
var selfExec = new SelfEventLoopExecutor(); // our new executor
var res = Mono.fromFuture(future)
.publishOn(Schedulers.fromExecutor(selfExec)) // schedule on the new executor
.map(val -> {
System.out.println("Thread: " + Thread.currentThread().getName());
return val + "1";
});
new Thread(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
future.complete("completed");
}, "completer").start();
var subs = new LazyBlockingSubscriber<String>(selfExec); // lazy subscribe
res.subscribeWith(subs);
subs.block(); // spin wait
}
В результате код печатает Thread: main
.