CompletableFuture - запустить несколько вызовов отдыха параллельно и получить другой результат - PullRequest
2 голосов
/ 10 июля 2020

У меня довольно общее или уникальное требование. Например, у меня есть следующий список AccountDetails:

List<AccountDetails>

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;   
}

Все приведенные выше поля, кроме bankAccountId, извлекаются из внешнего вызова службы REST. Я хочу вызвать все службы REST параллельно и обновить каждый объект в списке:

Итак, это выглядит так:

Для каждого accountDetails

  • Вызов ипотечной службы REST и обновление поля martgageAccountId (REST возвращает объект MortgageInfo)
  • Вызов службы REST транзакции и обновление поля noOfTrans (REST возвращает Transactions объект)
  • Адрес вызова REST обслуживание и обновление addressLine поле (REST возвращает Address объект)
  • Ссылка на вызов службы REST и обновление externalLink поля. (REST возвращает Links объект)

Я хочу, чтобы все вышеперечисленные вызовы выполнялись параллельно и для каждого AcccountDetails объекта в списке. Если есть исключение, я хочу аккуратно обработать его. Обратите внимание, что каждая из вышеперечисленных служб REST возвращает разные настраиваемые объекты

. Я не понимаю, как добиться этого с помощью цепочки CompletableFuture. Не уверен, что нужно использовать allOf или thenCombine (что занимает всего два) или thenCompose и как их все соединить.

Есть примеры / идеи?

Ответы [ 4 ]

2 голосов
/ 11 июля 2020

Поскольку вы отметили spring-boot, я предполагаю, что вы его используете, и ваши услуги написаны на Spring framework. поэтому я предоставил ответ, связанный с фреймворком Spring.

прежде всего я создал интерфейс для реализации rest API как asyn c.

public interface AsyncRestCall<T> {
   /** this is a hypothetical method with hypothetical params!*/
   CompletableFuture<T> call(String bankAccountId); 
   String type();
}

, тогда вы можете иметь реализация для вашей службы такая:

Как вы видите в этой реализации, я использовал MortgageRest, который представляет собой службу отдыха для Mortgage.

 @Service
 public class MortgageService implements AsyncRestCall<MortgageInfo> {

   private final MortgageRest mortgageRest;

   @Autowired
   public MortgageService(MortgageRest mortgageRest) {
       this.mortgageRest = mortgageRest;
   }

   @Override
   public CompletableFuture<MortgageInfo> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
    }

   @Override
   public String type() {
      return "mortgage";
   } 
} 

Остаточная ипотека:

@Service
public class MortgageRest {
  private RestTemplate restTemplate;
  public MortgageRest(RestTemplate restTemplate) {
     this.restTemplate = restTemplate;
  }
  public MortgageInfo service(String bankAccountId) {
     return new MortgageInfo("123455" + bankAccountId);
  }
}

для других сервисов отдыха сделайте это.

@Service
public class TransactionService implements AsyncRestCall<Transactions> {

   private final TransactionRest transactionRest;

   public TransactionService(TransactionRest transactionRest) {
      this.transactionRest = transactionRest;
   } 

   @Override
   public CompletableFuture<Transactions> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(transactionRest::service);
   }

   @Override
   public String type() {
       return "transactions";
   } 
} 

TransactionRest:

 @Service
 public class TransactionRest {

   public Transactions service() {
       return new Transactions(12);
   }
 }

теперь вам нужно иметь доступ ко всем реализациям AsyncRestCall. для этого порпуса вы можете объявить класс примерно так:

@Service
public class RestCallHolder {

  private final List<AsyncRestCall> asyncRestCalls;

  public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
      this.asyncRestCalls = asyncRestCalls;
  }

  public List<AsyncRestCall> getAsyncRestCalls() {
      return asyncRestCalls;
  }
}

AccountDetailService (вы можете назвать то, что вам нравится) использует CompleteableFuture для параллельного вызова сервисов отдыха.

в этом serv ie каждый bankAccountId rest вызовы будут сохраняться в Map<String, Map<String, Object>> result = new HashMap<>();, что внешний ключ карты будет хранить значение bankAccountId как ключ, а его значение - это вызовы rest services, которые они сохранят на карте (внутренняя карта). ключ - это тип, а значение - это ответ на вызов. в конце путем перебора accountDetails обновит свои свойства.

@Service
public class AccountDetailService {

  private final RestCallHolder restCallHolder;

  public AccountDetailService(RestCallHolder restCallHolder) {
      this.restCallHolder = restCallHolder;
  }

  public List<AccountDetail> update(List<AccountDetail> accountDetails) {
     Map<String, Map<String, Object>> result = new HashMap<>();
     List<AccountDetail> finalAccountDetails = new ArrayList<>();

     accountDetails.forEach(accountDetail -> {
          List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
                    .stream()
                    .map(rest -> rest.call(accountDetail.getBankAccountId()))
                    .collect(Collectors.toList());

     CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                 .thenAccept(aVoid -> { 
                    Map<String, Object> res = restCallHolder.getAsyncRestCalls()
                              .stream()
                              .map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
                                  rest.call(accountDetail.getBankAccountId()).join()))
                              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                           result.put(accountDetail.getBankAccountId(), res);
                      }
                   ).handle((aVoid, throwable) -> {
                      return null; // handle the exception here 
             }).join();
            }
    );

      accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
             .bankAccountId(accountDetail.getBankAccountId())
             .mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
             .noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
             .build()));
     return finalAccountDetails;
   }
 }
1 голос
/ 11 июля 2020

Если я просто присвою класс вашей учетной записи:

class Account {
  String fieldA;
  String fieldB;
  String fieldC;

  Account(String fieldA, String fieldB, String fieldC) {
    this.fieldA = fieldA;
    this.fieldB = fieldB;
    this.fieldC = fieldC;
  }
}

Тогда вы можете использовать CompletableFuture#allOf(...), чтобы дождаться результатов всего завершенного будущего, по одному на обновление поля, а затем получить результат из эти фьючерсы индивидуально. Мы не можем использовать результат allOf, потому что он ничего не возвращает (void).

Account account = CompletableFuture.allOf(cfA, cfB, cfC)
    .thenApply(ignored -> {
      String a = cfA.join();
      String b = cfB.join();
      String c = cfC.join();
      return new Account(a, b, c);
    }).join(); // or get(...) with timeout

Мы можем использовать join в thenApply, потому что все завершаемые фьючерсы выполняются на этом этапе. Вы можете изменить приведенный выше блок кода, чтобы адаптировать ваш logi c, например, обновив поля вместо создания нового объекта. Обратите внимание, что join() выше может вызвать исключение, когда завершаемое будущее завершается в исключительных случаях. Вы можете изменить свое завершенное будущее на handle() его правильно перед отправкой будущего на allOf(...) или спросить, isCompletedExceptionally() перед использованием join():

CompletableFuture.allOf(cfA, cfB, cfC)
    .thenRun(() -> {
      if (!cfA.isCompletedExceptionally()) {
        account.fieldA = cfA.join();
      }
      if (!cfB.isCompletedExceptionally()) {
        account.fieldB = cfB.join();
      }
      if (!cfC.isCompletedExceptionally()) {
        account.fieldC = cfC.join();
      }
    }).join(); // or get(...) with timeout

Преимущества обновления полей внутри одного этапа завершения эти операции выполняются в одном потоке, поэтому вам не нужно беспокоиться о параллельной модификации.

1 голос
/ 11 июля 2020

Я бы возложил ответственность за выборку значений полей в сам объект модели. Вот три альтернативных решения с использованием параллельных потоков, потоков и исполнителя, а также a для l oop и исполнителя.

Решение 1:

accounts.parallelStream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(RestRequest::run);

Решение 2:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(executor::execute);

Решение 3:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
    execute(executor, account::updateMortgage);
    execute(executor, account::updateNoOfTrans);
    execute(executor, account::updateAddressLine);
    execute(executor, account::updateExternalLink);
}

private static void execute(Executor executor, Runnable task) {
    executor.execute(new RestRequest(task));
}

Общий код:

class RestRequest implements Runnable {
    private final Runnable task;

    RestRequest(Runnable task) {
        this.task = task;
    }

    @Override
    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            // A request failed. Others will not be canceled.
        }
    }
}

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;

    void fetchMortgage() {
        mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
    }

    void fetchNoOfTrans() {
        noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
    }

    void fetchAddressLine() {
        addressLine = AddressService.getAddress(bankAccountId).getLine();
    }

    void fetchExternalLink() {
        externalLink = LinkService.getLinks(bankAccountId).getExternal();
    }
}
1 голос
/ 10 июля 2020
AccountDetails accountDetails = new AccountDetails();

CompletableFuture.allOf(
                        CompletableFuture.
                                supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setMortgageAccountId(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setNoOfTrans(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setAddressLine(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setExternalLink(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                ).join();
...