Поток не подписывается в Spring 5 реактор - PullRequest
0 голосов
/ 18 сентября 2018

Я, наверное, что-то упускаю, но не могу понять, что это.

Следующий код ничего не делает:

webClient.get().uri("/some/path/here").retrieve()
     .bodyToMono(GetLocationsResponse.class)
     .doOnNext(System.out::println)
     .subscribe();

Если я пытаюсь заблокировать вызов, он работает нормально:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();

Странная вещь в том, что если я создаю Flux «вручную» (т.е. не из весеннего webClient), это работает нормально:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .doOnNext(System.out::println)
    .subscribe();

Может кто-нибудь объяснить, что я делаю не так? Разве .subscribe() не должен выполнять операцию в первом случае, как это было в последнем случае?

Спасибо!

1 Ответ

0 голосов
/ 18 сентября 2018

Краткий ответ

subscribe не блокирует текущий поток, это означает, что основной поток приложения может завершиться раньше, чем Flux выпустит какой-либо элемент.Так что либо используйте block, либо используйте ожидание в главном потоке.

Подробности

Вызов без аргументов subscribe () просто делает request(unbounded) на Flux без настройки каких-либо Subscriber,Обычно она запускается в отдельном потоке , но не блокирует текущий поток .Скорее всего, ваш основной поток заканчивается до того, как WebClient получил ответ в этом отдельном потоке, и произошел пассивный побочный эффект doOnNext(...).

Чтобы проиллюстрировать / проверить, что операция запущена, дождитесьнекоторое время в основной ветке.Просто поставьте следующую строку сразу после subscribe() вызова:

Thread.sleep(1000);

Теперь, поиграв со значением тайм-аута, вы сможете увидеть результат печати.

Давайте теперь неявно отправим пользовательский Scheduler для асинхронных операций и дождемся завершения всех его задач.Кроме того, давайте передадим аргумент System.out::println как subscribe(...) вместо doOnNext, чтобы полный код выглядел следующим образом:

ExecutorService executor = Executors.newSingleThreadExecutor(); 

webClient.get().uri("/some/path/here").retrieve()
    .bodyToMono(GetLocationsResponse.class)
    .publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
    .subscribe(System.out::println); //still non-blocking

executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 

В этом примере используется немного другая подписка (потребитель) .Наиболее важно, он добавляет publishOn (Планировщик) , который поддерживается ExecutorService.Последний используется затем для ожидания завершения в главном потоке.

Конечно, гораздо более простой способ достичь того же результата - использовать block(), как вы упомянули изначально:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();

Наконец, обратите внимание на третий пример с Flux.just(...)...subscribe() - кажется, он просто быстро завершается, прежде чем ваш основной поток завершается.Это связано с тем, что для испускания нескольких элементов String требуется гораздо меньше времени по сравнению с выделением одного элемента GetLocationsResponse (подразумевается время для запроса на запись + ответ на чтение + синтаксический анализ в POJO).Однако, если вы сделаете это Flux для задержки элементов, вы получите то же самое поведение:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
    .doOnNext(System.out::println)
    .subscribe(); 


Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500))
    .doOnNext(System.out::println)
    .blockLast(); //and that makes it printing back again
...