Почему медленная задача блокирует другие меньшие задачи в этой программе, которая использует параллельные потоки? - PullRequest
0 голосов
/ 28 января 2019

После вопроса от коллеги о параллельных потоках я написал следующий код для проверки чего-либо.

public class Test {

    public static void main(String args[]) {
        List<Runnable> list = new LinkedList<>();
        list.add(() -> {
            try {
                Thread.sleep(10000);
                System.out.println("Time : " + System.nanoTime() + " " + "Slow task");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        for (int i = 0; i < 1000; i++) {
            int j = i;
            list.add(() -> System.out.println("Time : " + System.nanoTime() + " " + j));
        }
        list.parallelStream().forEach(r -> r.run());
    }
}

Странно, но вывод всегда выглядит примерно так:

Time : 4096118049370412 61
Time : 4096118049567530 311
Time : 4096118049480238 217
Time : 4096118049652415 405
Time : 4096118049370678 436
Time : 4096118049370575 155
Time : 4096118049720639 437
Time : 4096118049719368 280
Time : 4096118049804630 281
Time : 4096118049684148 406
Time : 4096118049660398 218

TRUNCATED  

Time : 4096118070511768 669
Time : 4096118070675678 670
Time : 4096118070584951 426
Time : 4096118070704143 427
Time : 4096118070714441 428
Time : 4096118070722080 429
Time : 4096118070729569 430
Time : 4096118070736782 431
Time : 4096118070744069 432
Time : 4096118070751286 433
Time : 4096118070758554 434
Time : 4096118070765913 435
Time : 4096118070550370 930
Time : 4096118070800538 931
Time : 4096118070687425 671
Time : 4096118070813669 932
Time : 4096118070827794 672
Time : 4096118070866089 933
Time : 4096118070881358 673
Time : 4096118070895344 934
Time : 4096118070907608 674
Time : 4096118070920712 935
Time : 4096118070932934 675
Time : 4096118070945131 936
Time : 4096118070957850 676
Time : 4096118070982326 677
Time : 4096118070991158 678
Time : 4096118070999002 679
Time : 4096118071006501 680
Time : 4096118071017766 681
Time : 4096118071025766 682
Time : 4096118071033318 683
Time : 4096118071070603 684
Time : 4096118071080240 685
Time : 4096128063025914 Slow task
Time : 4096128063123940 0
Time : 4096128063148135 1
Time : 4096128063173285 2
Time : 4096128063176723 3
Time : 4096128063179939 4
Time : 4096128063183077 5
Time : 4096128063191001 6
Time : 4096128063194156 7
Time : 4096128063197273 8
Time : 4096128063200395 9
Time : 4096128063203581 10
Time : 4096128063206988 11
Time : 4096128063210155 12
Time : 4096128063213285 13
Time : 4096128063216411 14
Time : 4096128063219542 15
Time : 4096128063222733 16
Time : 4096128063232190 17
Time : 4096128063235653 18
Time : 4096128063238827 19
Time : 4096128063241962 20
Time : 4096128063245176 21
Time : 4096128063248296 22
Time : 4096128063251444 23
Time : 4096128063254557 24
Time : 4096128063257705 25
Time : 4096128063261566 26
Time : 4096128063264733 27
Time : 4096128063268115 28
Time : 4096128063272851 29

Process finished with exit code 0

То есть всегда есть некоторые задачи, ожидающие завершения медленной задачи, даже если все другие задачи завершены.Я бы предположил, что медленная задача должна занимать только один поток, а все остальные задачи должны завершаться без проблем, и только медленная задача должна занимать полные 10 секунд.У меня 8 процессоров, поэтому уровень параллелизма равен 7.

Что может быть причиной для этого?

Чтобы добавить больше информации, код предназначен только для понимания.Я не собираюсь нигде это делать.

1 Ответ

0 голосов
/ 28 января 2019

Существуют некоторые ограниченные возможности, когда речь идет о краже работы с потоками , поэтому, если один поток привязан к какой-либо работе в других исполнителях, эта работа будет заблокирована, пока она не закончит обработку других задач..

Вы можете визуализировать это, добавив несколько дополнительных замечаний по отладке вокруг вашего кода ...

class Test {

    public static void main(String[] args) {
        List<Runnable> list = new LinkedList<>();
        list.add(() -> {
            try {
                System.out.println("Long sleep - " + Thread.currentThread().getName());
                Thread.sleep(10000);
                System.out.println("Time : " + System.nanoTime() + " " + "Slow task");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        for (int i = 0; i < 1000; i++) {
            int j = i;
            list.add(() -> System.out.println("Time : " + System.nanoTime() + " " + j));
        }
        list.parallelStream().forEach(r -> {
            System.out.println(Thread.currentThread().getName());
            r.run();
            System.out.println();
        });
    }
}

После запуска я вижу следующее сообщение:

Long sleep - ForkJoinPool.commonPool-worker-4

... и примерно через десять секунд ...

Time : 11525122027429 Slow task

ForkJoinPool.commonPool-worker-4
Time : 11525122204035 0

ForkJoinPool.commonPool-worker-4
Time : 11525122245739 1

ForkJoinPool.commonPool-worker-4
Time : 11525122267015 2

ForkJoinPool.commonPool-worker-4
Time : 11525122286921 3

ForkJoinPool.commonPool-worker-4
Time : 11525122306266 4

ForkJoinPool.commonPool-worker-4
Time : 11525122338787 5

ForkJoinPool.commonPool-worker-4
Time : 11525122357288 6

ForkJoinPool.commonPool-worker-4
Time : 11525122376716 7

ForkJoinPool.commonPool-worker-4
Time : 11525122395218 8

ForkJoinPool.commonPool-worker-4
Time : 11525122414165 9

ForkJoinPool.commonPool-worker-4
Time : 11525122432755 10

ForkJoinPool.commonPool-worker-4
Time : 11525122452805 11

ForkJoinPool.commonPool-worker-4
Time : 11525122472624 12

ForkJoinPool.commonPool-worker-4
Time : 11525122491380 13

ForkJoinPool.commonPool-worker-4
Time : 11525122514417 14

ForkJoinPool.commonPool-worker-4
Time : 11525122534550 15

ForkJoinPool.commonPool-worker-4
Time : 11525122553751 16

Итак, это означает, что на моем ящике у работника-4 была запланирована какая-то работа, которая не моглане может быть украдено на основании того факта, что были некоторые неровные куски.Примечание: если поток обрабатывает задачу в чанке, эта работа больше не будет прерываться.

[31, 31, 31, 32, 31, 31, 31, 32, 31, 31, 31, 32, 31, 31, 31, 32, 31, 31, 31, 32, 31, 31, 31, 32, 31, 31, 31, 32, 31, 32, 31, 32, 0]

Если вы искали реализацию с потоками, которая могла бы украсть работу из потоков, которыеработает дольше, лучше использовать пул для кражи работы напрямую.

class Test {

    public static void main(String[] args) throws InterruptedException {
        List<Runnable> list = new LinkedList<>();
        list.add(() -> {
            try {
                System.out.println("Long sleep - " + Thread.currentThread().getName());
                Thread.sleep(10000);
                System.out.println("Time : " + System.nanoTime() + " " + "Slow task");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        for (int i = 0; i < 1000; i++) {
            int j = i;
            list.add(() -> {
                System.out.println(Thread.currentThread().getName());
                System.out.println("Time : " + System.nanoTime() + " " + j);
                System.out.println();
            });
        }


        final ExecutorService stealingPool = Executors.newWorkStealingPool();
        list.forEach(stealingPool::execute);
        stealingPool.shutdown();
        stealingPool.awaitTermination(15, TimeUnit.SECONDS);
    }
}

Выше приведен более надежный и разумный результат в конце списка:

Time : 12210445469314 Slow task

... что означает, что вся доступная работа была обработана за отведенное время (15 секунд).

...