Выполните шаги преобразования в блокирующем потоке для Mono - PullRequest
0 голосов
/ 15 июня 2019

Есть ли способ обеспечить выполнение всех шагов преобразования для одного Mono, созданного из будущего, в потоке, который подписывается и блокирует?

Например, следующий код

public static void main(String[] args) {
  var future = new CompletableFuture<String>();
  var res = Mono.fromFuture(future).map(val -> {
    System.out.println("Thread: " + Thread.currentThread().getName());
    return val + "1";
  });

  new Thread(() -> {
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {
    }
    future.complete("completed");
  }, "completer").start();

  res.block();
}

печатает Thread: completer, потому что будущее завершено из потока "завершителя". Я пытаюсь выяснить, есть ли способ заставить его всегда печатать Thread: main.

Ответы [ 2 ]

2 голосов
/ 16 июня 2019

Нет.Когда поток main блокируется через .block(), этот поток специально ожидает сигналы потока onNext, onComplete или onError (после того, как все вышестоящие операторы выполнились).Он не каким-то образом восстанавливает управление до вызова операторов восходящего потока для выполнения операторов.

Самое близкое, что вы можете сделать, это убедиться, что:

  1. подписка выполняется для определенного Scheduler (через .subscribeOn) И
  2. значение завершения будущего публикуется в том же Scheduler (через .publishOn).

Например:

Scheduler scheduler = Schedulers.parallel();
var res = Mono.fromFuture(future)
        .doFirst(() -> {   // Note: doFirst added in 3.2.10.RELEASE
            // prints a thread in the parallel Scheduler (specified by subscribeOn below)
            System.out.println("Subscribe Thread: " + Thread.currentThread().getName());
        })
        // specifies the Scheduler on which the the completion value
        // from above is published for downstream operators
        .publishOn(scheduler)
        .map(val -> {
            // prints a thread in the parallel Scheduler (specified by publishOn above)
            System.out.println("Operator Thread: " + Thread.currentThread().getName()); 
            return val + "1";
        })
        // specifies the Scheduler on which  upstream operators are subscribed
        .subscribeOn(scheduler);

Однако обратите внимание на следующее:

  • Подписка происходит в потоке в Scheduler, не заблокированный main поток.
  • Этот подход просто гарантирует, что используется тот же Scheduler, а не тот же Thread в Scheduler.Теоретически вы можете форсировать одно и то же Thread, используя однопоточный планировщик (например, Schedulers.newParallel("single-threaded", 1))
  • .publishOn не заставляет , что все операторыоперируйте этим Scheduler.Он действует только на следующих операторов до следующего .publishOn или до следующего асинхронного оператора (такого как .flatMap), который может использовать другой Scheduler.
0 голосов
/ 16 июня 2019

Как очень неоптимизированное подтверждение концепции, это может быть достигнуто следующим образом:

Давайте создадим исполнителя, который сможет выполнять задачи «по требованию» контролируемым образом.

private static class SelfEventLoopExecutor implements Executor {
  private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

  @Override
  public void execute(Runnable command) {
    boolean added = queue.add(command);
    assert added;
  }

  public void drainQueue() {
    Runnable r;
    while ((r = queue.poll()) != null) {
      r.run();
    }
  }
}

Затем создайте подписчика, который сможет использовать исполнителя для выполнения задач в ожидании результата, а не для полной блокировки потока.

public static class LazyBlockingSubscriber<T> implements Subscriber<T> {
  private final SelfEventLoopExecutor selfExec;
  private volatile boolean completed = false;
  private volatile T value;
  private volatile Throwable ex;

  public LazyBlockingSubscriber(SelfEventLoopExecutor selfExec) {
    this.selfExec = selfExec;
  }

  @Override
  public void onSubscribe(Subscription s) {
    s.request(1);
  }

  @Override
  public void onNext(T t) {
    value = t;
    completed = true;
  }

  @Override
  public void onError(Throwable t) {
    ex = t;
    completed = true;
  }

  @Override
  public void onComplete() {
    completed = true;
  }

  public T block() throws Throwable {
    while (!completed) {
      selfExec.drainQueue();
    }
    if (ex != null) {
      throw ex;
    }
    return value;
  }
}

Теперь мы можем изменить код следующим образом

public static void main(String[] args) throws Throwable {
  var future = new CompletableFuture<String>();

  var selfExec = new SelfEventLoopExecutor(); // our new executor

  var res = Mono.fromFuture(future)
      .publishOn(Schedulers.fromExecutor(selfExec))  // schedule on the new executor
      .map(val -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
        return val + "1";
      });

  new Thread(() -> {
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {
    }
    future.complete("completed");
  }, "completer").start();

  var subs = new LazyBlockingSubscriber<String>(selfExec); // lazy subscribe
  res.subscribeWith(subs);
  subs.block(); // spin wait
}

В результате код печатает Thread: main.

...