Как сказано в этого ответа , если Executor
является пулом Fork / Join, есть (недокументированная) функция, которая запускает параллельный поток в одном из его рабочих потоков, будет выполнятьпараллельная работа с использованием этого исполнителя.
Если вы хотите поддерживать произвольные реализации Executor
, все становится сложнее.Одно из решений выглядит так:
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenComposeAsync(
response -> {
List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
.stream()
.map(CompletableFuture::completedFuture)
.map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
.thenApplyAsync(x ->
list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executor);
},
executor);
}
Первое, что важно, это то, что мы должны отправить все потенциально асинхронные задания, прежде чем начинать ожидать выполнения любого из них, чтобы обеспечить максимальный параллелизм, который может поддерживать исполнитель.Следовательно, мы должны собрать все фьючерсы в List
на первом шаге.
На втором шаге мы могли бы просто перебрать список и join
всех фьючерсов.Если исполнитель является пулом Fork / Join и будущее еще не завершено, он обнаружит это и запустит поток компенсации, чтобы восстановить настроенный параллелизм.Однако, для произвольных исполнителей, мы не можем предполагать такую особенность.Наиболее примечательно, если исполнитель является однопоточным исполнителем, это может привести к тупику.
Следовательно, решение использует CompletableFuture.allOf
для выполнения операции итерации и объединения всех фьючерсов, только когда все они былизавершено уже.Следовательно, это решение никогда не заблокирует поток исполнителя, что делает его совместимым с любой реализацией Executor
.