Краткий ответ
subscribe
не блокирует текущий поток, это означает, что основной поток приложения может завершиться раньше, чем Flux выпустит какой-либо элемент.Так что либо используйте block
, либо используйте ожидание в главном потоке.
Подробности
Вызов без аргументов subscribe () просто делает request(unbounded)
на Flux
без настройки каких-либо Subscriber
,Обычно она запускается в отдельном потоке , но не блокирует текущий поток .Скорее всего, ваш основной поток заканчивается до того, как WebClient
получил ответ в этом отдельном потоке, и произошел пассивный побочный эффект doOnNext(...)
.
Чтобы проиллюстрировать / проверить, что операция запущена, дождитесьнекоторое время в основной ветке.Просто поставьте следующую строку сразу после subscribe()
вызова:
Thread.sleep(1000);
Теперь, поиграв со значением тайм-аута, вы сможете увидеть результат печати.
Давайте теперь неявно отправим пользовательский Scheduler
для асинхронных операций и дождемся завершения всех его задач.Кроме того, давайте передадим аргумент System.out::println
как subscribe(...)
вместо doOnNext
, чтобы полный код выглядел следующим образом:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
В этом примере используется немного другая подписка (потребитель) .Наиболее важно, он добавляет publishOn (Планировщик) , который поддерживается ExecutorService
.Последний используется затем для ожидания завершения в главном потоке.
Конечно, гораздо более простой способ достичь того же результата - использовать block()
, как вы упомянули изначально:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
Наконец, обратите внимание на третий пример с Flux.just(...)...subscribe()
- кажется, он просто быстро завершается, прежде чем ваш основной поток завершается.Это связано с тем, что для испускания нескольких элементов String
требуется гораздо меньше времени по сравнению с выделением одного элемента GetLocationsResponse
(подразумевается время для запроса на запись + ответ на чтение + синтаксический анализ в POJO).Однако, если вы сделаете это Flux
для задержки элементов, вы получите то же самое поведение:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again