Выполните три моно параллельно, как только они будут созданы, подождите, пока все не закончат sh и соберите результаты с указанным c порядком / логикой - PullRequest
0 голосов
/ 25 марта 2020

Я новичок в Spring WebFlux, поэтому, пожалуйста, будьте осторожны ... Извините, если мне не хватает какой-то очевидной вещи, но я пытался искать примеры в Интернете и каждый раз, когда у меня возникают последовательные вызовы.

У меня такая ситуация:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);

Ответ - это класс в моем проекте, но для этого примера мы можем рассматривать их как простые контейнеры одного списка.

Я бы например:

  • Выполнить их параллельно (как только я назначу их для mono1 / mono2 / mono3, вызвав .subscribeOn (Schedulers.parallel ()) может быть?)
  • Когда все сделано, за исключением ответа на resp1, resp2, resp3
  • , если resp1 имеет результаты (список не пуст), возвращает resp1 else ...
  • ..., если resp2 имеет результаты ( список не пустой) return resp2 else ...
  • ... return resp3 (даже если он пуст)

Как мне этого добиться? ( так как я несколько раз терпел неудачу)

Моя первая попытка была:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
    return resp1;
}
if (resp2.isNotEmpty()) {
    return resp2;
}
return resp3;

Это не похоже на работу, делает ParallelFlux.fr om (mono1, mono2, mono3) .then (). block () действительно запустить эти моно параллельно? Также зачем мне ParallelFlux? Разве я не могу просто сказать «запустить этот моно в отдельном потоке», как только я создаю каждый моно? Каждый .block () фактически повторяет вызов .... как его повторное выполнение моно ... почему?

ОБНОВЛЕНИЕ:

Читая комментарии, которые я изменил мой код к этому:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();

Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();

if (resp1.hasMessages()) {
    return resp1;
}
if (resp2.hasMessages()) {
    return resp2;
}
return resp3;

Теперь, похоже, работает. Нужно ли делать что-то еще, или я в порядке с этим решением? Должен ли я также изменить Mono.zip(mono1, mono2, mono3).block() в Mono.zip(mono1, mono2, mono3).subscribeOn(Schedulers.parallel()).block()? ps Теперь я снова читаю документы и думаю, что мне следует использовать Schedulers.elasti c () вместо Schedulers.parallel ().

1 Ответ

1 голос
/ 25 марта 2020

Создание моно не выполняет его автоматически. Вам нужен оператор терминала, например subscribe или block, чтобы инициировать выполнение (subscribeOn не является оператором терминала. Он вам не нужен, если вы не хотите отложить выполнение до другого пула потоков. По умолчанию он использует пул потоков по умолчанию). Если вы хотите, чтобы несколько моно работали параллельно, вы можете использовать оператор zip.

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

return Mono.zip(mono1, mono2, mono3)
        .map(t -> {
            if (t.getT1().isEmpty()) {
                if (t.getT2().isEmpty()) {
                    return t.getT3();
                } else {
                    return t.getT2();
                }
            } else {
                return t.getT1();
            }
        });

Примечание: вызов этого не выполняется и дает вам результат. Он возвращает вам моно, по которому вы можете вызвать subscribe(), чтобы получить результат.

...