Как разветвляться внутри цепочки CompletableFuture? - PullRequest
0 голосов
/ 26 октября 2018

Я хочу связать CompletableFuture так, чтобы он разворачивался в середине обработки.Под этим я подразумеваю, что у меня есть открытое CompletableFuture для списка, и я хочу применить вычисления для каждого элемента в этом списке.

Первым шагом является вызов m_myApi.getResponse (request, executor) , который выполняет асинхронный вызов.

Результатом этого асинхронного вызова является метод getCandidates .Я хочу проанализировать все эти кандидаты параллельно.

В настоящее время мой код анализирует их все последовательно

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .map(MyParser::ParseCandidates)
                                                   .collect(Collectors.toList()));
}

Я хочу что-то вроде этого:

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .PARSE_IN_PARALLEL_USING_EXECUTOR
}

Ответы [ 2 ]

0 голосов
/ 26 октября 2018

Как сказано в этого ответа , если Executor является пулом Fork / Join, есть (недокументированная) функция, которая запускает параллельный поток в одном из его рабочих потоков, будет выполнятьпараллельная работа с использованием этого исполнителя.

Если вы хотите поддерживать произвольные реализации Executor, все становится сложнее.Одно из решений выглядит так:

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
       @Nonnull final REQUEST request, @Nonnull final Executor executor)
{
    CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
    return candidates.thenComposeAsync(
        response -> {
            List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
                .stream()
                .map(CompletableFuture::completedFuture)
                .map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
                .collect(Collectors.toList());
            return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenApplyAsync(x ->
                    list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                    executor);
        },
        executor);
}

Первое, что важно, это то, что мы должны отправить все потенциально асинхронные задания, прежде чем начинать ожидать выполнения любого из них, чтобы обеспечить максимальный параллелизм, который может поддерживать исполнитель.Следовательно, мы должны собрать все фьючерсы в List на первом шаге.

На втором шаге мы могли бы просто перебрать список и join всех фьючерсов.Если исполнитель является пулом Fork / Join и будущее еще не завершено, он обнаружит это и запустит поток компенсации, чтобы восстановить настроенный параллелизм.Однако, для произвольных исполнителей, мы не можем предполагать такую ​​особенность.Наиболее примечательно, если исполнитель является однопоточным исполнителем, это может привести к тупику.

Следовательно, решение использует CompletableFuture.allOf для выполнения операции итерации и объединения всех фьючерсов, только когда все они былизавершено уже.Следовательно, это решение никогда не заблокирует поток исполнителя, что делает его совместимым с любой реализацией Executor.

0 голосов
/ 26 октября 2018

Уже существует версия thenApply, которая принимает Executor в качестве дополнительного аргумента.

<U> CompletionStage<U>  thenApplyAsync​(Function<? super T,​? extends U> fn, Executor executor)

Если вы передаете туда исполнителя forkjoin, тогда параллельный поток внутрилямбда будет использовать переданного исполнителя вместо общего пула .

...