Как правильно использовать Schedulers.single () в Project Reactor? - PullRequest
0 голосов
/ 17 апреля 2020

Я пытаюсь запустить базовый c пример с Project Reactor и его классом Flux. Исходный код должен создавать целые числа от 1 до 10 и просто выводить выданные целые числа.

Все примеры выполняются в основном методе приложения без запуска другого кода.

Это довольно легко чтобы запустить основы:

Flux.range(1, 10).subscribe(System.out::println);

Следующим шагом является испускание целых чисел в другом потоке. Это может быть достигнуто с помощью

        Flux.range(1, 10)
                .publishOn(Schedulers.newSingle("OtherThread"))
                .subscribe(System.out::println);

Как указано в ссылке на проект, Schedulers.newSingle("OtherThread") создает «отдельный поток для каждого вызова» (см. Ссылка на реактор проекта ). Ссылка объясняет, что существует также Schedulers.single(), который предоставляет доступ к контексту выполнения «одного, повторно используемого потока» и «повторно использует один и тот же поток для всех вызывающих абонентов».

Поскольку я использую потоки только одна точка в этом примере publishOn(...) Насколько я понимаю, оба метода (newSingle(...) и single()) могут использоваться взаимозаменяемо.

        Flux.range(1, 10)
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);

Но последний пример ничего не печатает. И, честно говоря, после нескольких часов поиска и игры я не понимаю, почему.

Я нашел эту статью в блоге Flight of Flux 3 - Прыгающие потоки и планировщики , в которой говорится single() как "для одноразовых задач, которые могут быть запущены на уникальном ExecutorService". Но это не приносит света во тьму.

Как часто я ожидаю, что есть простой ответ на вопрос Почему newSingle(...) и single() ведут себя по-разному в этом базовом примере c? что заставит меня чувствовать себя глупо. Но я был бы более чем счастлив, если бы это, наконец, разрешило мою путаницу.

Интересная заметка сайта заключается в том, что путем введения log() пример печатается как шарм

        Flux.range(1, 10)
                .log()
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);

ОБНОВЛЕНИЕ : Согласно ответу Martin Tarjányi я создал gist , который демонстрирует различное поведение с помощью небольшого фрагмента кода и поясняющего текста.

1 Ответ

0 голосов
/ 17 апреля 2020

Когда вы создаете новый одиночный планировщик, используя newSingle(String) по умолчанию, он создает новый поток, не являющийся демоном , что означает, что он будет блокировать выход приложения, пока его пул потоков не будет закрыт.

Однако, если вы используете встроенный single(), он будет использовать поток демона, который не будет препятствовать выходу приложения, даже если его работа еще не завершена. Это именно то, что вы видите в своем примере: основной поток завершил свою работу, собрав реактивный конвейер, и виртуальная машина завершает свою работу независимо от состояния отдельного потока демона.

Чтобы иметь одинаковое поведение в обоих случаях, вы можете замените подписку на doOnNext() и blockLast():

Flux.range(1, 10)
    .publishOn(Schedulers.single())
    .doOnNext(System.out::println)
    .blockLast();

Обычно использование блоков настоятельно не рекомендуется в реактивном программировании. Однако, если вашему основному потоку больше нечего делать, это нормально, вызывая block() в вашей реактивной цепочке.

...