Как ждать окончания подписки? - PullRequest
2 голосов
/ 03 июня 2019

Я хочу сделать асинхронный вызов покоя, для которого я использую Spring webclient и возвращаю Mono.Я также делаю некоторые вызовы базы данных параллельно, но это не может быть сделано реактивно по какой-то причине.

    Map<String, Object> models = new HashMap<>();

    Mono<User> users = this.webClient...;
    users.map(resp -> new UserState(userRequest, resp))
            .subscribe(response -> {
                models.put("userState", response);
            });
    Iterable<Product> messages = this.productRepository.findAll();
    models.put("products", messages);
    //Wait for users.subscribe to finish <<<<<<<<<<<<<HERE
    return new ModelAndView("messages/list", models);

Как мне ждать окончания подписки, прежде чем возвращать ModelAndView.Это было бы легко, если бы я использовал Future, где я могу сделать get(), когда захочу.

1 Ответ

1 голос
/ 04 июня 2019

Вы можете обернуть блокирующий вызов в Mono, выполняемом в отдельном планировщике, сжать его с Mono, содержащим UserState данные, и преобразовать их комбинацию в Mono<ModelAndView> (которую можно вернуть из методов контроллера Spring).Вызовы будут выполняться параллельно, результаты будут объединены после завершения обоих вызовов.

Вы можете определить отдельный ограниченный планировщик для приложения специально для блокировки вызовов и предоставить его в качестве аргумента конструктора для любого класса, который выполняет блокировкузвонки.

Код будет выглядеть следующим образом:

@Configuration 
class SchedulersConfig {

  @Bean
  Scheduler parallelScheduler(@Value("${blocking-thread-pool-size}") int threadsCount) {
    return Schedulers.parallel(threadsCount);
  }
}

@RestController
class Controller {

  final Scheduler parallelScheduler;

  ...

  Mono<User> userResponse = // webClient...

  Mono<Iterable<Product>> productsResponse = Mono.fromSupplier(productRepository::findAll)
    .subscribeOn(parallelScheduler); 

  return Mono.zip(userResponse, productsResponse, (user, products) -> 
    new ModelAndView("messages/list", 
      ImmutableMap.of(
        "userState", new UserState(userRequest, user),
        "products", products
      ))
  );
}

Обновление на основе комментария:
Если вам просто нужно выполнить HTTP-вызов асинхронно, а затемприсоединиться к ней с базой данных результатов можно следующим образом

Map<String, Object> models = new HashMap<>();
Mono<User> userMono = webClient...;
CompletableFuture<User> userFuture = userMono.toFuture();
Iterable<Product> messages = productRepository.findAll();
User user = userFuture.join();
models.put("products", messages);
models.put("userState", new UserState(userRequest, user));
return new ModelAndView("messages/list", models);
...