Задачи подсчета ядра CompletableFuture медленнее, чем parallelStream - PullRequest
0 голосов
/ 02 июня 2018

Мой компьютер с четырьмя ядрами (FYI)

CompletableFuture будет использовать ForkJoinPool.commonPool() в качестве официального документа указывает:

Все асинхронные методы безявный аргумент Executor выполняется с помощью ForkJoinPool.commonPool () (если только он не поддерживает уровень параллелизма как минимум два, и в этом случае создается новый поток для выполнения каждой задачи).

Я отладил и обнаружил следующий код из CompletableFuture.supplyAsync(Supplier<U> supplier)

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

Что означает, что parallelStream всегда использует ForkJoinPool.commonPool(), но вот почему это быстрее.

Я попытался распечатать их и обнаружил, что только три нити при использовании CompletableFuture:

private static int concurrencyGet() {
    List<CompletableFuture<Integer>> futureList = IntStream.rangeClosed(0, 10).boxed()
   .map(i -> CompletableFuture.supplyAsync(() -> getNumber(i)))
            .collect(Collectors.toList());
    return futureList.stream().map(future -> future.join()).reduce(0, Integer::sum);
}

Но ParallelsStream используют четыре , включая основная нить.

Я предполагаю, что в CompletableFuture.supplyAsync() ForkJoinPool.getCommonPoolParallelism() составляет всего три , в то время как основной поток принимает один из четырех, поскольку он асинхронен .

Но parallelStream будет использовать все четыре , поскольку он не асинхронный .

Это правильно?Интересно, есть ли официальные документы по этому вопросу?

Спасибо за помощь.

1 Ответ

0 голосов
/ 03 июня 2018

Вот как я понял это из Venkat Subramaniam s talk на Параллельное и асинхронное программирование с потоками и CompletableFuture :

As CompleteableFuture также использует ForkJoinPool.commonPool(), он также может использовать основной поток, и это делает при определенных обстоятельствах.

Учитывая следующий пример

public static void main(String[] args) {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> numberSupplier());
    future.thenAccept(i -> System.out.println("f: " + i + " - " + Thread.currentThread()));
    sleep(100); //wait for async operations to finish before exiting
}

private static Integer numberSupplier() {
    Integer n = 2;
    System.out.println("c: " + n + " - " + Thread.currentThread());
    sleep(19);
    return n;
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

, вы можете получить консольный вывод, подобный этому:

c: 2 - Thread[ForkJoinPool.commonPool-worker-1,5,main]
f: 2 - Thread[ForkJoinPool.commonPool-worker-1,5,main]

И часть supplyAsync(..), и часть thenAccept(..) выполняются рабочим потоком из ForkJoinPool.

Однако, если задано Supplier<Integer>supplyAsync(..) настолько быстро, что завершается, когда вызывается thenAccept(..), тогда эта вторая часть также может быть выполнена в главном потоке:

private static Integer numberSupplier() {
    Integer n = 2;
    //System.out.println("c: " + n + " - " + Thread.currentThread());
    //sleep(19);
    return n;
}

Вывод:

f: 2 - Thread[main,5,main]
...