Асинхронно вызывать завершаемое будущее в цикле внутри завершаемого будущего. - PullRequest
0 голосов
/ 30 ноября 2018

Существует 3 службы, которые являются вызовами REST: serviceOne, serviceTwo и serviceThree.Обработка serviceTwo зависит от выходных данных serviceOne, а обработка serviceThree зависит от выходных данных serviceTwo.Тем не менее, serviceThree должен выполняться в цикле.

Я хочу получить окончательный ответ после завершения завершаемого обслуживания serviceThreeRespCF.В настоящее время, planResponse возвращается до того, как service3 завершит обработку.

Я попытался использовать join () для serviceThreeRespCF, но это не помогло.Есть ли лучший способ сделать это, то есть без использования get () или join ()?

public CompletionStage<PlansResponse> process() {

    final PlansResponse plansResponse = new PlansResponse();
    final CompletableFuture<PlansResponse> plansResp = new CompletableFuture<>();

    final CompletionStage<ServiceOneResp> serviceOneRespCF = getResp1(); <A REST CALL>

    return serviceOneRespCF.thenCompose(serviceOneResp -> {

        // function to validate service one resp

        if (condition based on validation)
        {
            List<Plans> plans = serviceOneResp.getPlans();
            if (condition) {
               plansResponse.setSysMsgs());
            }
            else {

                final CompletionStage<ServiceTwoResp> serviceTwoRespCF = getResp2(); <A REST CALL>
                serviceTwoRespCF.thenCompose(serviceTwoResp -> {

                    plans.parallelStream().forEach(plan -> {

                        if (Optional.ofNullable(plan.getPlanId()).isPresent()) {

                            final CompletionStage<ServiceThreeResp> serviceThreeRespCF = getResp3();  <A REST CALL>

                            serviceThreeRespCF.thenAcceptAsync(serviceThreeResp -> {

                                Set<PlanDetail> planDetails = new HashSet<>();
                                planDetails = populatePlanDetails(plan, serviceTwoResp, serviceThreeResp, planDetails);
                                planDetailsList.addAll(planDetails.stream().distinct().collect(Collectors.toList()));
                                plansResponse.setPlanDetails(planDetailsList);
                                planDetails.clear();

                            });
                        }
                    });
                    return serviceThreeRespCF;
                });
            }
        }
        plansResp.complete(plansResponse);
        return plansResp;
    });
}

EDIT 1 : пробное соединение (оператор возврата является продолжением кодавыше)

  1. непосредственно на serviceThreeRespCF

     return serviceThreeRespCF;
                }).toCompletableFuture().join();
    
  2. с использованием thenRunAsync

    return serviceThreeRespCF;
                }).thenRunAsync(() -> {
                    serviceThreeRespCF.toCompletableFuture().join();
                });
    

EDIT 2 : я пытался использовать allOf, чтобы получить значения этапов завершения service3 в будущем.(эта часть после службы 2 затем составить).Как можно видеть, ответ service2 недоступен для этого завершаемого будущего и не работает.Кроме того, это включает в себя необходимость повторять список планов 2 раза.Есть ли другой способ сделать это?

CompletableFuture<List<Service3Response>> service3ResponseList = allAsList(futures);
service3ResponseList.thenAccept(service3Responses -> {
    AtomicInteger i = new AtomicInteger(0);
    plans.parallelStream().forEach(plan -> {
        Set<PlanDetail> planDetails = new HashSet<>();
        planDetails = populatePlanDetails(acctSetFromRequest, plan, service3Responses.get(i.get()), service2Response <NOT AVAILABLE>, planDetails, acctMapFromResponse.get(plan.getPlanId()));
        planDetailsList.addAll(planDetails.stream().distinct().collect(Collectors.toList()));
        plansResponse.setDistributionPlanDetails(planDetailsList);
        planDetails.clear();
        i.addAndGet(1);
    });
});

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
            futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
...