Потоки Java ForkJoinPool не завершаются - PullRequest
0 голосов
/ 23 октября 2018

Я пытаюсь распараллелить цикл for, используя потоки Java и ForkJoinPool, чтобы контролировать количество используемых потоков.При запуске с одним потоком распараллеленный код возвращает тот же результат, что и последовательная программа.Последовательный код представляет собой набор стандартных циклов for:

for(String file : fileList){
    for(String item : xList){
        for(String x : aList) {
              // action code
        }
    }
}

И вот моя параллельная реализация:

ForkJoinPool threadPool = new ForkJoinPool(NUM_THREADS);
int chunkSize = aList.size()/NUM_THREADS;

for(String file : fileList){
    for(String item : xList){
    IntStream.range(0,  NUM_THREADS)
        .parallel().forEach(i -> threadPool.submit(() -> {

          aList.subList(i*chunkSize, Math.min(i*chunkSize + chunkSize -1, aList.size()-1))
               .forEach(x -> {
                      // action code
                });
          }));

         threadPool.shutdown();
         threadPool.awaitTermination(5, TimeUnit.MINUTES);
    }
}

При использовании более 1 потока, только ограниченное количествоитерации завершены.Я попытался использовать .shutdown() и .awaitTermination(), чтобы обеспечить завершение всех потоков, однако, похоже, это не работает.Количество итераций, которые значительно отличаются от запуска к запуску (между 0-1500).

Примечание: Я использую Macbook Pro с 8 доступными ядрами (4 двухъядерных)и мой код действия не содержит ссылок, которые делают распараллеливание небезопасным.

Любой совет будет высоко оценен, спасибо!

1 Ответ

0 голосов
/ 24 октября 2018

Я думаю, что настоящая проблема у вас вызвана вашим звонком shutdown на ForkJoinPool.Если вы посмотрите на Javadoc, это приведет к «упорядоченному завершению, в котором выполняются ранее отправленные задачи, но новые задачи не будут приняты» - т.е.Я бы ожидал, что на самом деле закончится только одна задача.

Кстати, нет никакого смысла в использовании ForkJoinPool так, как вы его используете.ForkJoinPool предназначен для рекурсивного разделения рабочей нагрузки, в отличие от того, что вы делаете с созданием подсписков в цикле, но предполагается, что ForkJoinPool будет подпитываться RecursiveAction с, которые сами разбивают свою работу, а не делят ее заранее.как вы делаете в цикле.Это всего лишь примечание;ваш код должен работать нормально, но было бы понятнее, если бы вы просто отправляли свои задачи на обычный ExecutorService, например, тот, который вы получаете Executors.newFixedThreadPool(parallelism), а не new ForkJoinPool().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...