Как мы можем перебрать и распечатать значения из Reactor Flux или Mono FlatMap или FlatMapMany? - PullRequest
0 голосов
/ 24 марта 2020

Learning Reactor с Spring Boot.

Использование примера API:

https://jsonplaceholder.typicode.com/todos/1
{
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}

Требуется отобразить вышеупомянутое в объект (определенный pojo SingleUser) и распечатать вывод.

private WebClient webClient = WebClient.create("https://jsonplaceholder.typicode.com");

private Mono<ClientResponse> responseMono = webClient.get()
          .uri("/todos/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange();

public String getResult() {
   return ">> result = " + responseMono.flatMap(res -> res.bodyToMono(String.class)).block();
}

При использовании вышеупомянутого .. результат:

>> result = {
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}

Как я могу выполнить итерацию и распечатать все значения при использовании Flux, как показано ниже ?

public Flux<SingleUser> listUsers1() {
    return webClient.get()
             .uri("/todos/1")
             .retrieve()
             .bodyToFlux(SingleUser.class);
}

public String getUsers1() {
   return ">> Get Users 1= " + listUsers1();
}

public Flux<SingleUser> listUsers2() {
   return webClient.get()
             .uri("/todos/1")
             .exchange()
             .flatMapMany(clientResponse -> clientResponse.bodyToFlux(SingleUser.class));
}

public String getUsers2() {
   return ">> Get Users 2= " + listUsers2();
}

Результат при использовании exchange() и retrieve():

>> Get Users 1= MonoFlatMapMany
>> Get Users 2= MonoFlatMapMany

Как мы можем перебрать объект и напечатать значения?

1 Ответ

3 голосов
/ 24 марта 2020

Во-первых, обратите внимание, что вы имеете дело с асинхронной неблокирующей моделью, но вы в основном прибегаете к блокировке в демонстрационных целях.

Предел этого переключения с асинхронного c на син c показывает при работе с несколькими значениями, потому что хотя каждое из этих Flux может быть распечатано, вы не будете контролировать «итерацию» (которая может происходить чередующимся образом между двумя запросами):

Существует семейство операторов Reactor, называемых «побочными эффектами», когда вы хотите посмотреть значения в последовательности, например, например. войти / распечатать их: doOn*. Вы можете использовать doOnNext для печати каждого значения, испускаемого Flux:

listUsers1().doOnNext(u -> System.out.println("listUsers1 received " + u);
listUsers2().doOnNext(u -> System.out.println("listUsers2 received " + u);

Единственная проблема заключается в том, что он не подписывается на соответствующую печать Flux, поэтому ничего не произойдет. Даже если вы просто .subscribe() в своем тесте, приложение может немедленно выйти, не дожидаясь окончания этих двух асинхронных c последовательностей.

Так что, как и в первом примере, вам нужно заблокировать. С Flux вы можете использовать .blockLast() для блокировки до завершения потока.

Вторая проблема: ваши getUsersX() методы ожидают, что смогут получить String и вернуть его синхронно. Это не будет практично с Flux даже с blockLast сверху:

public String getUsers1() {
       return ">> Get Users 1= " + listUsers1().doOnNext(System.out::println).blockLast();
    }

public String getUsers2() {
   return ">> Get Users 2= " + listUsers2().doOnNext(System.out::println).blockLast();
}

System.out.println(getUsers1());
System.out.println(getUsers2());

Будет регистрировать что-то вроде:

user1A
user1B
user1C
user1D
>> Get Users 1= user1D
user2A
user2B
user2C
>> Get Users 2= user2C

Обратите внимание, как каждый запрос печатает все значения, ТОГДА Получить сообщение пользователя с последним повторенным значением. Кроме того, из-за blockLast() первый запрос должен пройти свой ход до того, как будет запущен второй запрос.

Наиболее реактивный способ печати запроса с поддержкой параллельных запросов - это асинхронный сбор пользователей и распечатайте список, как только он станет доступным:

listUsers1().collectList().doOnNext(l1 -> System.out.println(">> Get Users 1=" + l1).subscribe();
listUsers2().collectList().doOnNext(l2 -> System.out.println(">> Get Users 2=" + l2).subscribe();

Но он страдает тем же предостережением, что не блокирует основной / тестовый: приложение может завершиться до того, как списки будут созданы, поэтому ничего не будет напечатано. Мы можем сделать ie два collectList в Mono и дождаться обоих завершений, используя Mono.when:

Mono.when(
    listUsers1().collectList().doOnNext(l1 -> System.out.println(">> Get Users 1=" + l1),
    listUsers2().collectList().doOnNext(l2 -> System.out.println(">> Get Users 2=" + l2)
)
.block();

. С этим кодом оба запроса запускаются одновременно и выводится самое быстрое для завершения во-первых, что бы это ни было.

...