Простой list.parallelStream () в потоке Java 8, кажется, не работает кража? - PullRequest
0 голосов
/ 11 мая 2018

Из этого вопроса " Будут ли внутренние параллельные потоки обрабатываться полностью параллельно перед рассмотрением распараллеливания внешнего потока? ", я понял, что потоки выполняют кражу работы.Тем не менее, я заметил, что это часто не происходит.Например, если у меня есть список, скажем, 100 000 элементов, и я пытаюсь обработать его в параллельном режиме (), я часто замечаю в конце, что большинство моих процессорных ядер бездействуют в состоянии «ожидания».(Примечание: из 100 000 элементов в списке некоторые элементы занимают много времени для обработки, тогда как другие работают быстро; список не сбалансирован, поэтому некоторые потоки могут стать «неудачливыми» и иметь много дел, тогда какдругим повезло, и им нечего делать).

Итак, моя теория состоит в том, что JIT-компилятор делает первоначальное деление 100 000 элементов на 16 потоков (потому что у меня 16 ядер), но затем внутри каждого потока,он просто выполняет простой (последовательный) цикл for (так как это будет наиболее эффективно), и, следовательно, никогда не произойдет кража работы (что я и вижу).

Я думаю, почему Будут ли внутренние параллельные потоки обрабатываться полностью параллельно перед рассмотрением распараллеливания внешнего потока? показал, что кража работы заключается в том, что существует OUTER-цикл, который транслировал , и INNER LOOP, который был потоковым, и т.д.в этом случае каждый внутренний цикл оценивается во время выполнения и создает новые задачи, которые во время выполнения могут быть назначены на "простоя"hreads.Мысли?Есть ли что-то, что я делаю неправильно, что бы "заставить" простой list.parallelStream () использовать кражу работы?(Мой текущий обходной путь - попытаться сбалансировать список на основе различных эвристик, чтобы каждый поток видел, как правило, один и тот же объем работы; но это трудно предсказать ...)

1 Ответ

0 голосов
/ 12 мая 2018

Это не имеет ничего общего с JIT-компилятором, но с реализацией Stream API.Он разделит рабочую нагрузку на куски, которые последовательно обрабатываются рабочими потоками.Общая стратегия состоит в том, чтобы иметь больше заданий, чем рабочих потоков, для включения кражи работы, см., Например, ForkJoinTask.getSurplusQueuedTaskCount(), который можно использовать для реализации такой адаптивной стратегии.

Следующий кодможет использоваться для определения, сколько элементов было обработано последовательно, когда источником является ArrayList:

List<Object> list = new ArrayList<>(Collections.nCopies(10_000, ""));
System.out.println(System.getProperty("java.version"));
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println( list.parallelStream()
    .collect(
        () -> new ArrayList<>(Collections.singleton(0)),
        (l,x) -> l.replaceAll(i -> i + 1),
        List::addAll) );

На моем текущем тестовом компьютере он печатает:

1.8.0_60
4
[625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625]

Так что естьбольше кусков, чем ядер, чтобы позволить краже работы.Однако, как только началась последовательная обработка фрагмента, он не может быть разделен далее, поэтому эта реализация имеет ограничения, когда время выполнения для каждого элемента существенно различается.Это всегда компромисс.

...