Выполнить параллельный поток после окончания другого потока - PullRequest
0 голосов
/ 07 февраля 2020

Кстати, я все еще изучаю weblux; Я не знаю, возможно ли это, или у меня неправильный подход, но с учетом этого параллельного потока.

Flux<String> enablers = Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential();

, который вызывает метод с запросом веб-клиента (service.getAMono)

webClient.post()
              .uri(url)
              .headers(headers -> headers.addAll(httpHeaders))
              .body(BodyInserters.fromObject(request))
              .retrieve()
              .bodyToMono(entity2.class);

Мне нужно дождаться окончания потока потока активаторов и обработать все ответы внутри него, причина в том, что если один из них выдаст мне ошибку или отрицательный ответ, я не буду запускать этот другой параллельный поток для блокировщиков

Flux<String> blockers = Flux.fromIterable(blockersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.callAMono(string, entity, element))
                .sequential();

Я думаю о методе "zip", но этот объединяет оба ответа, и это не то, что я хочу Если кто-нибудь может помочь мне с этим.

ОБНОВЛЕНИЕ

enablers. //handle enablers response and if error return a custom Mono<response> with .reduce

И если нет ошибки в дескрипторе enablers перейти к .thenMany с другим Flux

1 Ответ

0 голосов
/ 08 февраля 2020

Я нашел способ сделать это условно any в первом flux, например,

Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential()
                .any(element -> *stuff here)//condition
                .flatMap(condition->{
                        if(condition.equals(Boolean.FALSE)){
                           return Flux.fromIterable(blockersList)
                                                   .parallel()
                                                   .runOn(Schedulers.elastic())
                                                   .flatMap(element -> service.callAMono(string, entity, element))
                                                   .sequential()
                                                   .reduce(**stuff here)// handle noError response and return;
                          }
                          return Mono.just(**stuff here);//handle error response and return
                 });

Если есть другой способ сделать это, пожалуйста, я буду рад, что вы опубликуете это вот спасибо, D

...