Небольшое улучшение по сравнению с отправленным вами ответом может заключаться в использовании ConcurrentHashMap
в качестве своего рода кэша, так что ваш окончательный список содержит те же результаты, независимо от порядка, в котором вы их получили:
Map<Integer, CompletableFuture<Integer>> seen = new ConcurrentHashMap<>();
List<CompletableFuture<Integer>> intFutures = Stream.iterate(0, i -> i + 1)
.limit(5)
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i)))
.map(cf -> cf.thenCompose(result ->
seen.computeIfAbsent(
result, res -> CompletableFuture.supplyAsync(() -> runStage2(res))
)
))
.collect(Collectors.toList());
Обратите внимание, что важно, чтобы функция, переданная в computeIfAbsent()
, немедленно возвращалась (например, с использованием supplyAsync()
), потому что она удерживает блокировку внутри карты во время выполнения.Кроме того, эта функция не должна пытаться изменить карту seen
, поскольку может вызвать проблемы .
При таком изменении выходные данные могут быть, например:
stage - 1: 1
stage - 1: 0
stage - 1: 2
stage - 2: 1
stage - 2: 2
stage - 1: 3
stage - 2: 0
stage - 1: 4
0
1
2
0
1
Кроме того, это позволяет проверить карту seen
после завершения всех фьючерсов, чтобы получить уникальные результаты.