Параллелизм планировщика Spring Reactor Webflux - PullRequest
3 голосов
/ 25 мая 2020

Для полностью неблокирующих сквозных реактивных вызовов рекомендуется явно вызывать publishOn или subscribeOn для переключения планировщиков? Для задач, потребляющих ЦП, или задач, не потребляющих ресурсы, выгодно ли всегда использовать параллельный поток для оптимизации производительности?

Ответы [ 2 ]

2 голосов
/ 26 мая 2020

Стоит отметить, что я предполагаю, что контекст здесь - это Webflux, а не реактор в целом (поскольку вопрос помечен как таковой). Рекомендации, конечно, могут сильно различаться, если мы говорим об обобщенном варианте использования реактора без учета Webflux.

Для полностью неблокирующих сквозных реактивных вызовов рекомендуется явно вызывать publishOn или subscribeOn для переключения планировщиков?

Общая рекомендация - нет для явного вызова этих методов, если у вас нет на то причины. (Нет ничего плохого в том, чтобы использовать их в правильном контексте, но использование их «просто потому, что», скорее всего, не принесет пользы.) параллельный поток для оптимизации производительности?

Это зависит от того, чего вы стремитесь достичь, и что вы подразумеваете под задачами, потребляющими центральный процессор (или процессором). Обратите внимание, что здесь я говорю о действительно задачах, интенсивно использующих ЦП, а не о блокирующем коде - и в этом случае я бы в идеале передал часть, интенсивно использующую ЦП, на другой микросервис, что позволит вам масштабировать это по мере необходимости, отдельно от вашего сервиса Webflux.

Использование параллельного потока (и его запуск в параллельном планировщике) должно использовать все доступные ядра для обработки данных, что вполне может привести к их более быстрой обработке. Но имейте в виду, что по умолчанию у вас также есть событие l oop, выполняющееся для каждого ядра, поэтому вы, по сути, «украли» некоторую доступную мощность из события l oop, чтобы достичь этого. Идеально ли это, зависит от вашего варианта использования, но обычно это не принесет большой пользы.

Вместо этого я бы рекомендовал два подхода:

  • Если вы можете сломаться разделите вашу задачу с интенсивным использованием ЦП на небольшие фрагменты с низкой интенсивностью, сделайте это - и тогда вы можете оставить ее на событии l oop. Это позволяет событию l oop продолжать выполняться своевременно, в то время как эти задачи, интенсивно использующие процессор, планируются так, как они могут быть.
  • Если вы не можете сломать его, разверните отдельный планировщик (необязательно с низким приоритетом, чтобы с меньшей вероятностью украсть ресурсы из события l oop), а затем передать все задачи, интенсивно использующие процессор, на это. Это имеет недостаток, заключающийся в создании большого количества потоков, но снова оставляет событие l oop свободным. По умолчанию у вас будет столько потоков, сколько ядер для события l oop - вы можете wi sh, чтобы уменьшить это количество, чтобы дать вашему "интенсивно загружающему ЦП" планировщику больше ядер для игры.
1 голос
/ 25 мая 2020

Для полностью неблокирующих сквозных реактивных вызовов рекомендуется явно вызывать publishOn или subscribeOn для переключения планировщиков?

publishOn используется при публикации sh данные в нисходящий поток, а subscribeOn используется, когда вы потребляете данные из восходящего потока. Так что это действительно зависит от того, какую работу вы хотите выполнить.

Для задач, потребляющих ЦП, или непотребляющих, выгодно ли всегда использовать параллельный поток для оптимизации производительности?

Абсолютно нет. Рассмотрим этот пример:

Flux.range(1, 10)
        .parallel(4)
        .runOn(Schedulers.parallel())
        .sequential()
        .elapsed()
        .subscribe(i -> System.out.printf(" %s ", i));

Приведенный выше код является пустой тратой, потому что i будет обработан почти мгновенно. Следующий код будет работать лучше, чем приведенный выше:

Flux.range(1, 10)
        .elapsed()
        .subscribe(i -> System.out.printf(" %s ", i));

Теперь рассмотрим это:

public static <T> T someMethodThatBlocks(T i, int ms) {
    try { Thread.sleep( ms ); }
    catch (InterruptedException e) {}
    return i;
}

// some method here
Flux.range(1, 10)
        .parallel(4)
        .runOn(Schedulers.parallel())
        .map(i -> someMethodThatBlocks(i, 200))
        .sequential()
        .elapsed()
        .subscribe(i -> System.out.printf(" %s ", i));

Результат похож на:

 [210,3]  [5,1]  [0,2]  [0,4]  [196,6]  [0,8]  [0,5]  [4,7]  [196,10]  [0,9] 

Как вы можете видеть, первый ответ пришел через 210 мс, за ним последовали 3 ответа с примерно 0 прошедшим временем между ними. Цикл повторяется снова и снова. Здесь вы должны использовать параллельный поток. Обратите внимание, что создание большего количества потоков не гарантирует производительности, потому что, когда количество потоков больше, переключение контекста добавляет много накладных расходов, и, следовательно, код следует протестировать задолго до развертывания. Если имеется много блокирующих вызовов, наличие более одного потока на ЦП может дать вам повышение производительности, но если выполняемые вызовы интенсивны по ЦП, то наличие более одного потока на ЦП замедлит производительность из-за переключения контекста.

В общем, это всегда зависит от того, чего вы хотите достичь.

...