Почему stream parallel () не использует все доступные потоки? - PullRequest
2 голосов
/ 21 января 2020

Я попытался запустить 100 Sleep задач параллельно, используя Java8 (1.8.0_172) stream.parallel (), представленный внутри пользовательского ForkJoinPool с более чем 100 доступными потоками. Каждое задание будет спать в течение 1 с. Я ожидал, что вся работа закончится sh через ~ 1 с, учитывая, что 100 снов можно было выполнять параллельно. Однако я наблюдаю время выполнения 7 с.

    @Test
    public void testParallelStream() throws Exception {
        final int REQUESTS = 100;
        ForkJoinPool forkJoinPool = null;
        try {
            // new ForkJoinPool(256): same results for all tried values of REQUESTS
            forkJoinPool = new ForkJoinPool(REQUESTS);
            forkJoinPool.submit(() -> {

                IntStream stream = IntStream.range(0, REQUESTS);
                final List<String> result = stream.parallel().mapToObj(i -> {
                    try {
                        System.out.println("request " + i);
                        Thread.sleep(1000);
                        return Integer.toString(i);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).collect(Collectors.toList());
                // assertThat(result).hasSize(REQUESTS);
            }).join();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

С выводом, указывающим ~ 16 потоковых элементов, выполняемых до паузы 1 с, затем еще ~ 16 и т. Д. Таким образом, кажется, что хотя forkjoinpool был создан из 100 потоков, привыкли использовать только ~ 16.

Этот шаблон появляется, как только я использую более 23 потоков:

1-23 threads: ~1s
24-35 threads: ~2s
36-48 threads: ~3s
...
System.out.println(Runtime.getRuntime().availableProcessors());
// Output: 4

Ответы [ 2 ]

4 голосов
/ 21 января 2020

Поскольку использование реализацией Stream пула Fork / Join является деталью реализации, уловка, заставляющая его использовать другой пул Fork / Join, также не документирована и, кажется, работает случайно, то есть существует жестко закодированный константа , определяющая фактический параллелизм, в зависимости от параллелизма пула по умолчанию. Поэтому использование другого пула изначально не предусматривалось.

Однако было признано, что использование другого пула с неуместным целевым параллелизмом является ошибкой, даже если этот прием не задокументирован, см. JDK -8190974 .

Это было исправлено в Java 10 и перенесено в Java 8, обновление 222.

Так что простым миром решений будет обновление Java версии.

Вы также можете изменить параллелизм пула по умолчанию, например,

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");

перед выполнением любого действия Fork / Join.

Но это может иметь непредвиденные последствия для других параллельных операций.

2 голосов
/ 21 января 2020

Когда вы написали это, вы позволяете потоку определять параллелизм выполнения.

В результате получается, что ArrayList.parallelStream пытается перехитрить вас, разделив данные равномерно, без учета числа доступные темы во внимание. Это хорошо для операций с привязкой к ЦП, когда бесполезно иметь больше потоков, чем ядер ЦП, но оно не предназначено для процессов, которые должны ожидать ввода-вывода. ForkJoinPool, поэтому он вынужден использовать все доступные потоки?

        IntStream stream = IntStream.range(0, REQUESTS);
        List<ForkJoinTask<String>> results
                = stream.mapToObj(i -> forkJoinPool.submit(() -> {

            try {
                System.out.println("request " + i);
                Thread.sleep(1000);
                return Integer.toString(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toList());
        results.forEach(ForkJoinTask::join);

На моем компьютере это занимает менее двух секунд.

...