Часть 1. Наблюдение против подписки
Рассматривая вопрос, я вижу необходимость наблюдать элементов после выполнения в определенном потоке. Чтобы быть точным, наблюдение в этом контексте означает * возможность работать со значением в потоке в некотором указанном c потоке. В Rx Java у нас есть правильный оператор, который называется точно так же, но в Project Reactor мы вызываем такую же операцию, как publishOn
.
Таким образом, *, если вы хотите обработать данные * на Schedulers.boundedElastic()
, тогда вы должны использовать следующую конструкцию
Mono.fromFuture(..)
.publishOn(Schedulers.boundedElastic())
НО Подождите, .subscribeOn
также сработало ???
Читая предыдущую конструкцию, вы можете начать беспокоиться, потому что вы на 100% уверены, что
Mono.fromRunnable(..)
.subscribeOn(Schedulers.boundedElastic())
отправляет onNext
в потоке boundedElastic-1
, так что не так с тем же fromFuture
.
, и тут возникает хитрость:
Никогда не используйте subscribeOn
с Futures
/ CompletableFuture
или чем-либо, что может использовать собственный асин c механизм под
Если мы посмотрим на то, что происходит за subscribeOn
, вы обнаружит что-то вроде следующего:
// Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler scheduler;
Publisher<T> parent;
scheduler.schedule(() -> parent.subscribe(actual));
}
Что в основном означает, что метод subscribe
родителя будет вызываться в отдельном потоке.
Такая техника работает для fromRunnable
, fromSupplier
, fromCallable
потому что их логика c происходит в методе subscribe
:
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sds);
// skiped some parts
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
, что означает s оно почти равно
scheduler.schedule(() -> {
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
})
В отличие от fromFuture
работает намного сложнее. Короткий тест.
В каком потоке мы можем наблюдать значение? (предположим, что выполнение выполняется в потоке Main, а задача выполняется в ForkJoinPool)
var future = CompletableFuture
.supplyAsync(() -> {
return value;
})
... // some code here, does not metter just code
future.thenAccept(value -> {
System.out.println(Thread.currentThread())
});
И правильный ответ .... ??????
Может может быть Thread Main
или может быть Thread из ForkJoinPool
...
, потому что это racy ... и в данный момент мы принимаем значение, значение может быть уже доставлено, поэтому мы просто читаем volatile
поле в потоке читателя (поток Main), в противном случае поток Main просто собирается установить acceptor
, так что акцептор будет вызываться позже в потоке ForkJoinPool
.
Верно, поэтому, когда вы используете fromFuture
с subscribeOn
, нет никакой гарантии, что поток subscribeOn
будет соблюдать значение заданного CompletableFuture
.
Именно поэтому publishOn
- единственный способ обеспечить обработку значения в нужном потоке.
Хорошо, я должен использовать publishOn
все время вниз ???
И да, и нет. Это зависит.
Если вы используете Mono
- в 99% случаев вы можете использовать publishOn
, если хотите убедиться, что обработка данных происходит в определенном потоке - всегда используйте publishOn
.
Не беспокойтесь о возможных накладных расходах , Project Reactor позаботится о вас, даже если вы использовали его случайно. Project Reactor имеет несколько оптимизаций, которые могут заменить ваш publishOn
на subscribeOn
(если это безопасно, не нарушая поведения) во время выполнения, так что вы получите лучшее.
Часть 2. Падение в кроличью нору Scheduelr
s
Никогда не используйте Schedulers.immediate()
это почти не планировщик операций, который в основном делает
Schedulers.immediate().scheduler(runnable) {
runnable.run()
}
Правильно, это не приносит пользы пользователям реактора, и мы используем его только для внутренних нужд.
Хорошо, тогда как я могу использовать Планировщик, чтобы использовать его в императивном мире в качестве исполнителя
Есть два варианта:
Быстрый путь: пошаговое руководство
1.a) Создайте свой ограниченный Executor
. (например, Executors.fixed...
)
1.b) Создайте свой ограниченный ScheduledExecutorService
, если вы хотите получить мощность периодических c задач и отложенных задач
2) Создайте Scheduler
из вашего исполнителя, используя Schedulers.fromExecutorXXX
API
3) Используйте свой ограниченный Executor
в императивном мире, используйте свой Scheduler
, который является оберткой вокруг ограниченного для реактивного мира
Длинный путь
Скоро будет ...
Часть 3. Как сериализовать выполнение.
Скоро будет