Количество потоков с подзадачами - PullRequest
1 голос
/ 03 апреля 2020

Оптимум потоков в пуле - это то, что определяется регистром c, хотя есть практическое правило , которое говорит #threads = #CPU +1. Однако, как это работает с потоками, охватывающими другие потоки и ожидающими (т.е. блокированными до тех пор, пока thread.join () не будет успешным) для этих «подзадач»?

Предположим, у меня есть код, который требует выполнения списка задач (2), который имеет подзадачи (2), который имеет подзадачи (3) и так далее. Общее количество задач составляет 2 * 2 * 3 = 12, хотя будет создано 18 потоков (поскольку потоки будут «порождать» больше подзадач (потоков), где поток, порождающий большее количество потоков, будет заблокирован, пока все не закончится. См. Ниже для псевдокода.

Я предполагаю, что для процессора с N ядрами есть практическое правило, что все может быть распараллелено, если наибольшее количество активных потоков (12) равно #CPU + 1. Это правильно? ?

Псевдокод

outputOfTask = []
for subtask in SubTaskList
   outputOfTask --> append(subtask.doCompute())
// wait untill all output is finished.

в подзадаче. java: Каждая подзадача, например, реализует один и тот же интерфейс, но может отличаться.

   outputOfSubtask = []
    for task in subsubTaskList
        // do some magic depending on the type of subtask
        outputOfSubtask -> append( task.doCompute())
    return outputOfSubtask

в подзадаче. java:

 outputOfSubsubtask = []
    for task in subsubsubtask
        // do some magic depending on the type of subsubtask
        outputOfSubsubtask -> append( task.doCompute())
    return outputOfSubsubtask

РЕДАКТИРОВАТЬ: Dummy code Java code. Я использовал это в своем первоначальном вопросе, чтобы проверить, сколько потоков было активно , но я предполагаю, что псевдокод более четкий. Обратите внимание: я использовал коллекцию Eclipse , это вводит функцию asParallel, которая обеспечивает более короткую запись кода.

@Test
public void testasParallelthreads() {
    // // ExecutorService executor = Executors.newWorkStealingPool();
    ExecutorService executor = Executors.newCachedThreadPool();

    MutableList<Double> myMainTask = Lists.mutable.with(1.0, 2.0);
    MutableList<Double> mySubTask = Lists.mutable.with(1.0, 2.0);
    MutableList<Double> mySubSubTask = Lists.mutable.with(1.0, 2.0);
    MutableList<Double> mySubSubSubTask = Lists.mutable.with(1.0, 2.0, 2.0);

    MutableList<Double> a = myMainTask.asParallel(executor, 1)
            .flatCollect(task -> mySubTask.asParallel(executor,1)
            .flatCollect(subTask -> mySubSubTask.asParallel(executor, 1)
            .flatCollect(subsubTask   -> mySubSubSubTask.asParallel(executor, 1)
            .flatCollect(subsubTask -> dummyFunction(task, subTask, subsubTask, subsubTask,executor))
            .toList()).toList()).toList()).toList();        

    System.out.println("pool size: " + ((ThreadPoolExecutor) executor).getPoolSize());
    executor.shutdownNow();
}

private MutableList<Double> dummyFunction(double a, double b, double c, double d, ExecutorService ex) {
    System.out.println("ThreadId: " + Thread.currentThread().getId());
    System.out.println("Active threads size: " + ((ThreadPoolExecutor) ex).getActiveCount());
    return Lists.mutable.with(a,b,c,d);
}

Ответы [ 2 ]

0 голосов
/ 03 апреля 2020

Я предполагаю, что для процессора с N ядрами существует практическое правило, что все можно распараллелить, если наибольшее количество активных потоков (12) равно #CPU + 1. Это правильно?

Эта топика c чрезвычайно сложна для обобщения. Даже с кодом фактический производительность вашего приложения будет очень трудно определить. Даже если бы вы могли прийти к оценке, реальная производительность может сильно отличаться между запусками - особенно если учесть, что потоки взаимодействуют друг с другом. Единственный раз, когда мы можем взять число #CPU + 1, это если задания, отправляемые в пул потоков, независимы и полностью связаны с процессором.

Я бы порекомендовал попробовать несколько различных значений размера пула потоков. под смоделированной нагрузкой, чтобы найти оптимальные значения для вашего приложения. Изучение общей пропускной способности или статистики загрузки системы должно дать вам необходимую обратную связь.

0 голосов
/ 03 апреля 2020

Однако, как это работает с потоками, охватывающими другие потоки и ожидающими (т.е. блокированными до тех пор, пока thread.join () не будет успешным) для этих «подзадач»?

Потоки будут блокироваться, и это до OS / JVM, чтобы запланировать еще один, если это возможно. Если у вас есть один исполнитель пула потоков и вызовите соединение из одной из ваших задач, другая задача даже не запустится. В случае исполнителей, использующих больше потоков, задача блокировки будет блокировать один поток, а os / jvm может свободно планировать другие потоки.

Эти заблокированные потоки не должны потреблять процессорное время, поскольку они заблокированы , Поэтому я предполагаю, что для процессора с N ядрами существует практическое правило: все можно распараллелить, если наибольшее количество активных потоков (24) равно #CPU + 1. Это правильно?

Активные темы могут блокировать. Я думаю, что вы смешиваете здесь термины, #CPU, количество ядер и количество виртуальных ядер. Если у вас N физических ядер, то вы можете параллельно запускать N связанных с процессором задач. Если у вас есть другие типы блокирующих или очень коротких задач, тогда у вас может быть больше параллельных задач.

...