Как использовать планировщик Project Reactor с библиотеками на основе Executor? - PullRequest
4 голосов
/ 21 марта 2020

Project Reactor предоставляет отличный способ определить, какой пул потоков для кода должен выполняться, определив Scheduler. Он также обеспечивает мост к библиотекам, которые используют CompletableFuture, хотя Mono.fromFuture(..).

AWS 's asyn c клиент для DyanmoDB , выполняет CompletableFuture' s возвращается из вызовов API на java.util.concurrent.Executor. По умолчанию он создает Executor, поддерживаемый пулом потоков, который он также создает. В результате даже потоки с определенным Scheduler, как Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic()), выполняются в потоке из пула, который библиотека создает вместо одного из Schedulers.boundedElastic(). Таким образом, мы видим имена потоков, такие как sdk-async-response-0-2, вместо имен, таких как boundedElastic-1.

К счастью, библиотека позволяет нам предоставлять наши собственные Executor как , показанный здесь , поэтому у меня вопрос:

Как вы строите Executor, который использует поток из Scheduler , определенный для этой части потока во время выполнения?

Вариант использования

У нас есть класс репозитория, который имеет метод findById, и нам нужно, чтобы вызывающая сторона могла контролировать, на каком Scheduler работать, потому что он используется в этих совершенно разных контекстах:

  1. Ответы API, которые запускаются в планировщике Schedulers.boundedElastic().
  2. Обработка сообщений Kafka, которые выполняются по порядку, в потоке на раздел из определенного планировщика, как показано в документах Reactor Kafka .

Попытки

Мы попытались определить Executor с использованием Schedulers.immediate() и Runnable::run, как показано здесь, но оба приводят к выполнению в потоке события Netty l oop thread (пример nam e: aws-java-sdk-NettyEventLoop-0-2), а не поток из определенного Scheduler.

DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        runnable -> Schedulers.immediate().schedule(runnable)
    ))
    .build();
DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        Runnable::run
    ))
    .build();

1 Ответ

6 голосов
/ 22 марта 2020

Часть 1. Наблюдение против подписки

Рассматривая вопрос, я вижу необходимость наблюдать элементов после выполнения в определенном потоке. Чтобы быть точным, наблюдение в этом контексте означает * возможность работать со значением в потоке в некотором указанном c потоке. В Rx Java у нас есть правильный оператор, который называется точно так же, но в Project Reactor мы вызываем такую ​​же операцию, как publishOn.

Таким образом, *, если вы хотите обработать данные * на Schedulers.boundedElastic(), тогда вы должны использовать следующую конструкцию

Mono.fromFuture(..)
    .publishOn(Schedulers.boundedElastic())

НО Подождите, .subscribeOn также сработало ???

Читая предыдущую конструкцию, вы можете начать беспокоиться, потому что вы на 100% уверены, что

Mono.fromRunnable(..)
    .subscribeOn(Schedulers.boundedElastic())

отправляет onNext в потоке boundedElastic-1, так что не так с тем же fromFuture.

, и тут возникает хитрость:

Никогда не используйте subscribeOn с Futures / CompletableFuture или чем-либо, что может использовать собственный асин c механизм под

Если мы посмотрим на то, что происходит за subscribeOn, вы обнаружит что-то вроде следующего:

//  Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Scheduler scheduler;
    Publisher<T> parent;
    scheduler.schedule(() -> parent.subscribe(actual));
}

Что в основном означает, что метод subscribe родителя будет вызываться в отдельном потоке.

Такая техника работает для fromRunnable, fromSupplier, fromCallable потому что их логика c происходит в методе subscribe:

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Operators.MonoSubscriber<T, T>
    sds = new Operators.MonoSubscriber<>(actual);

    actual.onSubscribe(sds);
    // skiped some parts 
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
}

, что означает s оно почти равно

scheduler.schedule(() -> {
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
})

В отличие от fromFuture работает намного сложнее. Короткий тест.

В каком потоке мы можем наблюдать значение? (предположим, что выполнение выполняется в потоке Main, а задача выполняется в ForkJoinPool)

var future = CompletableFuture
.supplyAsync(() -> {
  return value;
})
... // some code here, does not metter just code

future.thenAccept(value -> {
  System.out.println(Thread.currentThread())
});

И правильный ответ .... ??????

Может может быть Thread Main
или может быть Thread из ForkJoinPool
...
, потому что это racy ... и в данный момент мы принимаем значение, значение может быть уже доставлено, поэтому мы просто читаем volatile поле в потоке читателя (поток Main), в противном случае поток Main просто собирается установить acceptor, так что акцептор будет вызываться позже в потоке ForkJoinPool.

Верно, поэтому, когда вы используете fromFuture с subscribeOn, нет никакой гарантии, что поток subscribeOn будет соблюдать значение заданного CompletableFuture.

Именно поэтому publishOn - единственный способ обеспечить обработку значения в нужном потоке.

Хорошо, я должен использовать publishOn все время вниз ???

И да, и нет. Это зависит.

Если вы используете Mono - в 99% случаев вы можете использовать publishOn, если хотите убедиться, что обработка данных происходит в определенном потоке - всегда используйте publishOn.

Не беспокойтесь о возможных накладных расходах , Project Reactor позаботится о вас, даже если вы использовали его случайно. Project Reactor имеет несколько оптимизаций, которые могут заменить ваш publishOn на subscribeOn (если это безопасно, не нарушая поведения) во время выполнения, так что вы получите лучшее.

Часть 2. Падение в кроличью нору Scheduelr s

Никогда не используйте Schedulers.immediate()

это почти не планировщик операций, который в основном делает

Schedulers.immediate().scheduler(runnable) {
   runnable.run()
}

Правильно, это не приносит пользы пользователям реактора, и мы используем его только для внутренних нужд.

Хорошо, тогда как я могу использовать Планировщик, чтобы использовать его в императивном мире в качестве исполнителя

Есть два варианта:

Быстрый путь: пошаговое руководство

1.a) Создайте свой ограниченный Executor. (например, Executors.fixed...)
1.b) Создайте свой ограниченный ScheduledExecutorService, если вы хотите получить мощность периодических c задач и отложенных задач
2) Создайте Scheduler из вашего исполнителя, используя Schedulers.fromExecutorXXX API
3) Используйте свой ограниченный Executor в императивном мире, используйте свой Scheduler, который является оберткой вокруг ограниченного для реактивного мира

Длинный путь

Скоро будет ...

Часть 3. Как сериализовать выполнение.

Скоро будет

...