Обертывание и превращение одного CompleteableFuture <OlderCat>в массовую операцию с результатом CompleteableFuture> - PullRequest
2 голосов
/ 06 июня 2019

У нас есть асинхронный метод:

public CompletableFuture<OlderCat> asyncGetOlderCat(String catName)

Приведен список кошек:

List<Cat> cats;

Нам нравится создавать массовую операцию, которая приведет к отображению между именем кота и его асинхронным результатом:

public CompletableFuture<Map<String, OlderCat>>

Нам также нравится, что если из asyncGetOlderCat было сгенерировано исключение, кот не будет добавлен на карту.

Мы читали этот пост , а также этот , и мы придумали этот код:

List<Cat> cats = ...

Map<String, CompletableFuture<OlderCat>> completableFutures = cats
            .stream()
            .collect(Collectors.toMap(Cat::getName,
                    c -> asynceGetOlderCat(c.getName())
                         .exceptionally( ex -> /* null?? */  ))
            ));


CompletableFuture<Void> allFutures = CompletableFuture
            .allOf(completableFutures.values().toArray(new CompletableFuture[completableFutures.size()]));

return allFutures.thenApply(future -> completableFutures.keySet().stream()
            .map(CompletableFuture::join) ???
            .collect(Collectors.toMap(????)));

Но не ясно, как в allFutures мы можем получить доступ к имени кошки и как сопоставить OlderCat и имя кошки.

Можно ли этого достичь?

Ответы [ 2 ]

1 голос
/ 06 июня 2019

Вы почти у цели. Вам не нужно ставить exceptionally() на начальные фьючерсы, но вы должны использовать handle() вместо thenApply() после allOf(), потому что если в будущем произойдет сбой, allOf() также потерпит неудачу.

При обработке фьючерсов вы можете просто отфильтровать неудачные из результата и восстановить ожидаемую карту:

Map<String, CompletableFuture<OlderCat>> completableFutures = cats
        .stream()
        .collect(toMap(Cat::getName, c -> asyncGetOlderCat(c.getName())));

CompletableFuture<Void> allFutures = CompletableFuture
        .allOf(completableFutures.values().toArray(new CompletableFuture[0]));

return allFutures.handle((dummy, ex) ->
        completableFutures.entrySet().stream()
                .filter(entry -> !entry.getValue().isCompletedExceptionally())
                .collect(toMap(Map.Entry::getKey, e -> e.getValue().join())));

Обратите внимание, что вызовы join() гарантированно не являются блокирующими, поскольку thenApply() будет выполняться только после завершения всех фьючерсов.

0 голосов
/ 06 июня 2019

Насколько я понимаю, вам нужно CompletableFuture со всеми результатами, приведенный ниже код делает именно то, что вам нужно

public CompletableFuture<Map<String, OlderCat>> getOlderCats(List<Cat> cats) {
    return CompletableFuture.supplyAsync(
            () -> {
                Map<String, CompletableFuture<OlderCat>> completableFutures = cats
                        .stream()
                        .collect(Collectors.toMap(Cat::getName,
                                c -> asyncGetOlderCat(c.getName())
                                        .exceptionally(ex -> {
                                            ex.printStackTrace();
                                            // if exception happens - return null
                                            // if you don't want null - save failed ones to separate list and process them separately
                                            return null;
                                        }))
                        );

                return completableFutures
                        .entrySet()
                        .stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                e -> e.getValue().join()
                        ));
            }
    );
}

То, что он делает здесь - возвращает будущее, что создает внутри более полные будущееи ждет в конце.

...