CompletableFuture in loop: как собрать все ответы и обработать ошибки - PullRequest
0 голосов
/ 02 июля 2018

Я пытаюсь вызвать остальные API для запроса PUT в цикле. Каждый звонок CompletableFuture. Каждый вызов API возвращает объект типа RoomTypes.RoomType

  • Я хочу собрать ответы (как успешные, так и с ошибками) ответы) в разных списках. Как мне этого добиться? Я уверен я не может использовать allOf, потому что он не получит все результаты, если таковые имеются один вызов не обновляется.

  • Как регистрировать ошибки / исключения для каждого вызова?


public void sendRequestsAsync(Map<Integer, List> map1) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
    List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Map.Entry<Integer, List> entry :map1.entrySet()) { 
        CompletableFuture requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> 
            //API call which returns object of type RoomTypes.RoomType
            updateService.updateRoom(51,33,759,entry.getKey(),
                           new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
                                    entry.getValue())),
                    yourOwnExecutor
            )//Supply the task you wanna run, in your case http request
            .thenApply(responses::add);

    completableFutures.add(requestCompletableFuture);
}

Ответы [ 2 ]

0 голосов
/ 02 июля 2018

В качестве альтернативы, возможно, вы можете подойти к проблеме с другой точки зрения, и вместо принудительного использования CompletableFuture вместо нее можно использовать CompletionService .

Вся идея CompletionService состоит в том, что, как только ответ на данное будущее готов, он помещается в очередь, из которой вы можете получать результаты.

Альтернатива 1: без CompletableFuture

CompletionService<String> cs = new ExecutorCompletionService<>(executor);

List<Future<String>> futures = new ArrayList<>();

futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));


List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}

System.out.println(successes); 
System.out.println(failures);

Что дает:

[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]

Альтернатива 2: с CompletableFuture

Однако, если вам действительно нужно иметь дело с CompletableFuture, вы также можете отправить их в службу завершения, просто поместив их прямо в свою очередь:

Например, следующий вариант имеет тот же результат:

BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);

List<Future<String>> futures = new ArrayList<>();

futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));

//places all futures in completion service queue
tasks.addAll(futures);

List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}
0 голосов
/ 02 июля 2018

Вы можете просто использовать allOf(), чтобы получить будущее, которое будет завершено, когда будут завершены все ваши первоначальные фьючерсы (в исключительных случаях или нет), а затем разделить их между успешными и неудачными, используя Collectors.partitioningBy():

List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (Map.Entry<Integer, List> entry : map1.entrySet()) {
    CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
            .supplyAsync(
                    () ->
                //API call which returns object of type RoomTypes.RoomType
                updateService.updateRoom(51, 33, 759, entry.getKey(),
                        new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
                                entry.getValue())),
                    yourOwnExecutor
            );

    completableFutures.add(requestCompletableFuture);
}

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
        // avoid throwing an exception in the join() call
        .exceptionally(ex -> null)
        .join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
        completableFutures.stream()
                .collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));

Полученная карта будет содержать одну запись с true для неудачных фьючерсов и другую запись с ключом false для успешных. Затем вы можете проверить 2 записи, чтобы действовать соответственно.

Обратите внимание, что по сравнению с исходным кодом произошли 2 незначительных изменения:

  • requestCompletableFuture теперь CompletableFuture<RoomTypes.RoomType>
  • thenApply(responses::add) и список responses были удалены

Относительно регистрации / обработки исключений, просто добавьте соответствующие requestCompletableFuture.handle(), чтобы регистрировать их по отдельности, но оставьте requestCompletableFuture, а не тот, который получен в handle().

...