Я ожидаю найти выходные данные конвейера обработки в моем списке, но программа немедленно завершает работу.
Код, который у вас есть , устанавливает ваша реактивная цепочка в главном потоке, а затем ничего не делает ... в главном потоке. Таким образом, основной поток завершает свою работу, и поскольку потоки boundedElastic()
являются потоками демонов, другие потоки не останавливают программу, поэтому она закрывается.
Вы можете увидеть то же поведение сболее простой пример:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);
Конечно, вы можете вызвать newBoundedElastic("name", false)
, чтобы сделать его планировщиком с поддержкой non-daemon , но тогда вам придется отслеживать его и вызывать disposeкогда вы закончите, это действительно просто перевернет проблему (программа будет работать бесконечно, пока вы не утилизируете планировщик).
Быстрое грязное 'n' грязное решение - просто заблокировать последний элемент Flux
как последняя строка в вашей программе - поэтому, если мы добавим:
f.blockLast();
..., тогда программа ожидает, пока последний элемент не будет выпущен, прежде чем выйти, и у нас будет поведение, которое мы преследуем.
Для простого доказательства концепции это хорошо. Это не идеально в «производственном» коде. Во-первых, «отсутствие блокировки» - это общее правило в реактивном коде, поэтому, если у вас есть такие блокировки вызовов, трудно решить, намечено это или нет. Если вы добавили другие цепочки, а также хотели, чтобы они завершились, вам нужно будет добавить блокирующие вызовы для каждого из них. Это грязно и не совсем устойчиво.
Лучше было бы использовать CountDownLatch
:
CountDownLatch cdl = new CountDownLatch(1);
Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500))
.doFinally(s -> cdl.countDown())
.subscribe(System.out::println);
cdl.await();
Это имеет преимущество, заключающееся в отсутствии явной блокировки, а также способности обрабатыватьболее одного издателя одновременно (если вы устанавливаете начальное значение выше 1.) Это также является подходом, который я считаю рекомендованным в целом для такого рода вещей - так что, если вы хотите наиболее широко принятое решение, это, вероятно, так.
Тем не менее, я предпочитаю Phaser
для всех примеров, когда вам нужно подождать нескольких издателей, а не одного - он работает аналогично CountdownLatch, но может динамически register()
и deregister()
. Это означает, что вы можете создать один фазер, а затем легко зарегистрировать в нем несколько издателей, не требуя изменения исходного значения, например:
Phaser phaser = new Phaser(1);
Flux.just(1, 2, 3, 4, 5)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
phaser.arriveAndAwaitAdvance();
(Вы, конечно, можете обернуть логику onSubscribe
и doFinally
вотдельный метод тоже при необходимости.)