Flux / Publisher на основной ветке - PullRequest
0 голосов
/ 30 октября 2018

Я новичок в реактивном программировании / проекте-реакторе, пытаюсь понять концепции. Создал Flux с методом range и подписался. Когда я смотрю на журнал, все работает в основном потоке.

     Flux
        .range(1, 5)
        .log()
        .subscribe(System.out::println);

    System.out.println("End of Execution");

[DEBUG] ( main ) Использование ведения журнала консоли [INFO] ( main ) | onSubscribe ([Синхронный плавкий предохранитель] FluxRange.RangeSubscription) [ ИНФОРМАЦИЯ] ( main ) | запрос (неограниченный) [INFO] ( main ) | далее (1) 1 [ИНФО] ( основной ) | onNext (2) 2 [ИНФО] ( main ) | onNext (3) 3 [ИНФО] ( main ) | onNext (4) 4 [INFO] ( main ) | onNext (5) 5 [INFO] ( main ) | OnComplete () Конец исполнения

Как только Publisher завершит передачу всех элементов, будет выполнен только остальной код ( System.out.println («Конец выполнения»); в приведенном выше примере). Издатель заблокирует поток по умолчанию? Если я изменю планировщик, кажется, что он не блокирует поток.

Flux
        .range(1, 5)
        .log()
        .subscribeOn(Schedulers.elastic())
        .subscribe(System.out::println);
    System.out.println("End of Execution");
    Thread.sleep(10000);

[DEBUG] (main) Использование ведения журнала консоли Конец выполнения [INFO] ( эластик-2 ) | onSubscribe ([Синхронный плавкий предохранитель] FluxRange.RangeSubscription) [INFO] (astic-2 ) | Запрос (неограниченный) [ИНФО] ( эластичный-2 ) | onNext (1) 1 [ИНФО] (astic-2 ) | далее (2) 2 [ ИНФОРМАЦИЯ] (astic-2 ) | onNext (3) 3 [ИНФО] (astic-2 ) | далее (4) 4 [ ИНФОРМАЦИЯ] (astic-2 ) | onNext (5) 5 [ИНФО] ( эластичный-2 ) | OnComplete ()

1 Ответ

0 голосов
/ 31 октября 2018

Reactor по умолчанию не применяет модель параллелизма, и да, многие операторы продолжат работу на Thread, где произошла операция subscribe().

Но это не значит, что использование Reactor заблокирует основной поток. Пример, который вы показываете, выполняет работу в памяти, без учета ввода-вывода или задержки. Также сразу подписывается на результат.

Вы можете попробовать следующий фрагмент и увидеть что-то другое:

Flux.range(1, 5)
    .delayElements(Duration.ofMillis(100))
    .log()
    .subscribe(System.out::println);
System.out.println("End of Execution");

В логах я вижу:

INFO   --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO   --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution

В этом случае задерживающие элементы будут планировать работу по-другому - и поскольку ничто здесь не поддерживает работу JVM, приложение завершается, и ни один элемент из диапазона не используется.

В более распространенном сценарии будут задействованы операции ввода-вывода и задержки, и эта работа будет планироваться соответствующим образом и не будет блокировать основной поток приложения.

...