Как указано в документации Reactor для различных subscribe
методов:
Имейте в виду, что, поскольку последовательность может быть асинхронной, это
немедленно вернуть управление вызывающему потоку. Это может дать
впечатление, что потребитель не вызывается при выполнении в главном потоке
или юнит тест например.
Это означает, что конец основного метода достигнут, и, таким образом, основной поток завершает работу, прежде чем какой-либо поток сможет подписаться на цепочку Reactive, как упомянул Петр.
Что вы хотите сделать, это дождаться завершения всей цепочки, прежде чем печатать содержимое массива.
Наивный способ сделать это:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.blockLast();
System.out.println("After: " + arrList);
Здесь вы блокируете выполнение в главном потоке до тех пор, пока не будет обработан последний элемент потока. Таким образом, последний System.out не будет выполняться, пока ваш ArrayList не будет полностью заполнен.
Помните, что способ запуска кода в консольном приложении по сравнению с серверной средой, такой как Netty, немного отличается. Единственный способ заставить консольное приложение ждать всех подписок, это block
.
Но блокировка не разрешена в параллельных потоках. Таким образом, этот подход не будет работать, скажем, в среде Netty. Там ваш сервер будет работать до явного выключения, и поэтому subscribe
будет в порядке.
Однако в приведенном выше фрагменте кода вы блокируете не только для предотвращения выхода из приложения, но и для ожидания, прежде чем читать заполненные данные.
Улучшение вышеуказанного кода будет следующим:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.doOnComplete(() -> System.out.println("After: " + arrList))
.blockLast();
Даже здесь doOnComplete
получает доступ к данным вне реактивной цепочки. Чтобы предотвратить это, вы должны собрать элементы Flux в самой цепочке, например:
System.out.println("Before.");
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnSuccess(list -> System.out.println("After: " + list))
.block();
Опять же, помните, что при работе в Netty (скажем, в приложении Spring Webflux) приведенный выше код будет заканчиваться на subscribe()
.
Обратите внимание, что переход от Flux к списку (или любой коллекции) означает, что вы переключаетесь с реактивной парадигмы на императивное программирование. Вы должны иметь возможность реализовать любую функциональность в самой парадигме Reactive.