Ожидание множественных моно-ответов Spring WebClient - PullRequest
1 голос
/ 18 октября 2019

Я пытаюсь вызвать внешнюю службу в приложении микро-службы, чтобы получить все ответы параллельно и объединить их перед началом другого вычисления. Я знаю, что могу использовать вызов block () для каждого объекта Mono, но это побьет цель использования реактивного API. Можно ли запустить все запросы параллельно и объединить их в одной точке.

Пример кода приведен ниже. В этом случае «Готово» печатается до фактического ответа. Я также знаю, что вызов подписки не блокирует.

Я хочу, чтобы "Готово" печаталось после того, как все ответы были собраны, поэтому нужна какая-то блокировка. однако я не хочу блокировать каждый запрос

final List<Mono<String>> responseOne = new ArrayList<>();
    IntStream.range(0, 10).forEach(i -> {
        Mono<String> responseMono =
                WebClient.create("https://jsonplaceholder.typicode.com/posts")
                        .post()
                        .retrieve()
                        .bodyToMono(String.class)
                ;
        System.out.println("create mono response lazy initialization");
        responseOne.add(responseMono);
    });

    Flux.merge(responseOne).collectList().subscribe( res -> {

        System.out.println(res);
    });
    System.out.println("Done");

Исходя из предложения, я пришел к этому, который, кажется, работает для меня.

StopWatch watch = new StopWatch();
    watch.start();
    final List<Mono<String>> responseOne = new ArrayList<>();
    IntStream.range(0, 10).forEach(i -> {
        Mono<String> responseMono =
                WebClient.create("https://jsonplaceholder.typicode.com/posts")
                        .post()
                        .retrieve()
                        .bodyToMono(String.class);
        System.out.println("create mono response lazy initialization");
        responseOne.add(responseMono);
    });
    CompletableFuture<List<String>> futureCount = new CompletableFuture<>();
    List<String>  res = new ArrayList<>();
    Mono.zip(responseOne, Arrays::asList)
            .flatMapIterable(objects -> objects) // make flux of objects
            .doOnComplete(() -> {
                futureCount.complete(res);
            }) // will be printed on completion of the flux created above
            .subscribe(responseString -> {
                        res.add((String) responseString);
                    }
            );

    watch.stop();
    List<String> response = futureCount.get();
    System.out.println(response);
    // do rest of the computation
    System.out.println(watch.getLastTaskTimeMillis());

Ответы [ 2 ]

2 голосов
/ 18 октября 2019
  1. Если вы хотите, чтобы ваши вызовы были параллельными, рекомендуется использовать Mono.zip
  2. Теперь вы хотите, чтобы Done печатался после сбора всех ответов

Итак, вы можете изменить свой код, как показано ниже

final List<Mono<String>> responseMonos = IntStream.range(0, 10).mapToObj(
        index -> WebClient.create("https://jsonplaceholder.typicode.com/posts").post().retrieve()
            .bodyToMono(String.class)).collect(Collectors.toList()); // create iterable of mono of network calls

Mono.zip(responseMonos, Arrays::asList) // make parallel network calls and collect it to a list
        .flatMapIterable(objects -> objects) // make flux of objects
        .doOnComplete(() -> System.out.println("Done")) // will be printed on completion of the flux created above
        .subscribe(responseString -> System.out.println("responseString = " + responseString)); // subscribe and start emitting values from flux

Также не рекомендуется явно указывать subscribe или block в своем реактивном коде.

0 голосов
/ 18 октября 2019

возможно ли запустить все запросы параллельно и объединить их в одной точке.

Это именно то, что ваш код уже делает. Если вы мне не верите, наберите .delayElement(Duration.ofSeconds(2)) после вашего bodyToMono() звонка. Вы увидите, что ваш список распечатывается через 2 секунды, а не через 20 (что было бы при последовательном выполнении 10 раз).

Часть , объединяющая , происходит вВаш Flux.merge().collectList() вызов.

В этом случае «Готово» печатается до фактического ответа.

Этого следует ожидать, так как ваш последний вызов System.out.println()выполнение вне цепочки реактивного обратного вызова. Если вы хотите, чтобы «Готово» печаталось после того, как распечатан ваш список (который вы вводили в заблуждение, указав имя переменной s в получателе, переданном вашему вызову subscribe()), вам нужно поместить его в этого получателя, а неза ее пределами.

Если вы взаимодействуете с императивным API, и поэтому вам необходимо заблокировать список , вы можете просто сделать:

List<String> list = Flux.merge(responseOne).collectList().block();

... который будет по-прежнему выполнять вызовы параллельно (поэтому все равно получит некоторое преимущество), но затем будет блокироваться, пока все они не будут завершены и объединены в список. (Если вы используете только реактор для этого вида использования, однако, это спорно, если это стоит.)

...