Я новичок в Spring WebFlux, поэтому, пожалуйста, будьте осторожны ... Извините, если мне не хватает какой-то очевидной вещи, но я пытался искать примеры в Интернете и каждый раз, когда у меня возникают последовательные вызовы.
У меня такая ситуация:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);
Ответ - это класс в моем проекте, но для этого примера мы можем рассматривать их как простые контейнеры одного списка.
Я бы например:
- Выполнить их параллельно (как только я назначу их для mono1 / mono2 / mono3, вызвав .subscribeOn (Schedulers.parallel ()) может быть?)
- Когда все сделано, за исключением ответа на resp1, resp2, resp3
- , если resp1 имеет результаты (список не пуст), возвращает resp1 else ...
- ..., если resp2 имеет результаты ( список не пустой) return resp2 else ...
- ... return resp3 (даже если он пуст)
Как мне этого добиться? ( так как я несколько раз терпел неудачу)
Моя первая попытка была:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
return resp1;
}
if (resp2.isNotEmpty()) {
return resp2;
}
return resp3;
Это не похоже на работу, делает ParallelFlux.fr om (mono1, mono2, mono3) .then (). block () действительно запустить эти моно параллельно? Также зачем мне ParallelFlux? Разве я не могу просто сказать «запустить этот моно в отдельном потоке», как только я создаю каждый моно? Каждый .block () фактически повторяет вызов .... как его повторное выполнение моно ... почему?
ОБНОВЛЕНИЕ:
Читая комментарии, которые я изменил мой код к этому:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();
Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();
if (resp1.hasMessages()) {
return resp1;
}
if (resp2.hasMessages()) {
return resp2;
}
return resp3;
Теперь, похоже, работает. Нужно ли делать что-то еще, или я в порядке с этим решением? Должен ли я также изменить Mono.zip(mono1, mono2, mono3).block()
в Mono.zip(mono1, mono2, mono3).subscribeOn(Schedulers.parallel()).block()
? ps Теперь я снова читаю документы и думаю, что мне следует использовать Schedulers.elasti c () вместо Schedulers.parallel ().