Ваш пример немного странный, так как вы замедляете основной поток в returnUserIdsFromDb()
, прежде чем какая-либо операция даже запустится, и аналогично fetchById
замедляет вызывающую, а не асинхронную операцию, что сводит на нет всю цель асинхронной операции.
Далее, вместо .thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …))
вы можете просто использовать .thenApplyAsync(listOfIds -> …)
.
Так что лучшим примером может быть
public class PipelineOfTasksExample {
private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
.collect(Collectors.toMap(id -> id, id -> "user"+id));
PipelineOfTasksExample() {}
private static <T> T slowDown(String op, T result) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(op + " -> " + result + " thread: "
+ Thread.currentThread().getName()+ ", "
+ POOL.getPoolSize() + " threads");
return result;
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
System.out.println("trigger building the list of Ids - thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
POOL);
}
private CompletableFuture<String> fetchById(Long id) {
System.out.println("trigger fetching id: " + id + " thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("fetching id: " + id , db.get(id)), POOL);
}
static ForkJoinPool POOL = new ForkJoinPool(2);
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenApplyAsync(listOfIds ->
listOfIds.parallelStream()
.map(id -> example.fetchById(id).join())
.collect(Collectors.toList()
),
POOL
);
System.out.println(result.join());
}
}
который печатает что-то вроде
trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 4 thread: ForkJoinPool-1-worker-2
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads
trigger fetching id: 1 thread: ForkJoinPool-1-worker-3
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads
[user1, user2, user3, user4]
, что может показаться удивительным количеством потоков на первый взгляд.
Ответ заключается в том, что join()
может блокировать поток, но если это происходит внутри рабочего потока пула Fork / Join, эта ситуация будет обнаружена, и будет запущен новый поток компенсации, чтобы обеспечить сконфигурированный целевой параллелизм .
В особом случае, когда используется пул Fork / Join по умолчанию, реализация может подобрать новые отложенные задачи в методе join()
, чтобы обеспечить прогресс в том же потоке.
Таким образом, код всегда будет прогрессировать, и нет ничего плохого в том, чтобы время от времени вызывать join()
, если альтернативы намного сложнее, но существует некоторая опасность слишком большого потребления ресурсов, если его использовать чрезмерно. Ведь причиной использования пулов потоков является ограничение количества потоков.
Альтернатива - использовать цепные зависимые операции, где это возможно.
public class PipelineOfTasksExample {
private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
.collect(Collectors.toMap(id -> id, id -> "user"+id));
PipelineOfTasksExample() {}
private static <T> T slowDown(String op, T result) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(op + " -> " + result + " thread: "
+ Thread.currentThread().getName()+ ", "
+ POOL.getPoolSize() + " threads");
return result;
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
System.out.println("trigger building the list of Ids - thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
POOL);
}
private CompletableFuture<String> fetchById(Long id) {
System.out.println("trigger fetching id: " + id + " thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("fetching id: " + id , db.get(id)), POOL);
}
static ForkJoinPool POOL = new ForkJoinPool(2);
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenComposeAsync(listOfIds -> {
List<CompletableFuture<String>> jobs = listOfIds.parallelStream()
.map(id -> example.fetchById(id))
.collect(Collectors.toList());
return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))
.thenApply(_void -> jobs.stream()
.map(CompletableFuture::join).collect(Collectors.toList()));
},
POOL
);
System.out.println(result.join());
System.out.println(ForkJoinPool.commonPool().getPoolSize());
}
}
Разница в том, что сначала отправляются все асинхронные задания, затем запланировано зависимое действие, вызывающее для них join
, которое будет выполняться только после завершения всех заданий, поэтому эти вызовы join
никогда не будут блокироваться. Только последний вызов join
в конце метода main
может блокировать основной поток.
Так что это печатает что-то вроде
trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 4 thread: ForkJoinPool-1-worker-1
trigger fetching id: 1 thread: ForkJoinPool-1-worker-0
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads
[user1, user2, user3, user4]
показывает, что не нужно создавать потоки компенсации, поэтому количество потоков соответствует настроенному целевому параллелизму.
Обратите внимание, что если фактическая работа выполняется в фоновом потоке, а не в самом методе fetchById
, теперь вам больше не нужен параллельный поток, поскольку нет блокирующего вызова join()
. Для таких сценариев простое использование stream()
обычно приводит к повышению производительности.