Оптимум потоков в пуле - это то, что определяется регистром c, хотя есть практическое правило , которое говорит #threads = #CPU +1. Однако, как это работает с потоками, охватывающими другие потоки и ожидающими (т.е. блокированными до тех пор, пока thread.join () не будет успешным) для этих «подзадач»?
Предположим, у меня есть код, который требует выполнения списка задач (2), который имеет подзадачи (2), который имеет подзадачи (3) и так далее. Общее количество задач составляет 2 * 2 * 3 = 12, хотя будет создано 18 потоков (поскольку потоки будут «порождать» больше подзадач (потоков), где поток, порождающий большее количество потоков, будет заблокирован, пока все не закончится. См. Ниже для псевдокода.
Я предполагаю, что для процессора с N ядрами есть практическое правило, что все может быть распараллелено, если наибольшее количество активных потоков (12) равно #CPU + 1. Это правильно? ?
Псевдокод
outputOfTask = []
for subtask in SubTaskList
outputOfTask --> append(subtask.doCompute())
// wait untill all output is finished.
в подзадаче. java: Каждая подзадача, например, реализует один и тот же интерфейс, но может отличаться.
outputOfSubtask = []
for task in subsubTaskList
// do some magic depending on the type of subtask
outputOfSubtask -> append( task.doCompute())
return outputOfSubtask
в подзадаче. java:
outputOfSubsubtask = []
for task in subsubsubtask
// do some magic depending on the type of subsubtask
outputOfSubsubtask -> append( task.doCompute())
return outputOfSubsubtask
РЕДАКТИРОВАТЬ: Dummy code Java code. Я использовал это в своем первоначальном вопросе, чтобы проверить, сколько потоков было активно , но я предполагаю, что псевдокод более четкий. Обратите внимание: я использовал коллекцию Eclipse , это вводит функцию asParallel
, которая обеспечивает более короткую запись кода.
@Test
public void testasParallelthreads() {
// // ExecutorService executor = Executors.newWorkStealingPool();
ExecutorService executor = Executors.newCachedThreadPool();
MutableList<Double> myMainTask = Lists.mutable.with(1.0, 2.0);
MutableList<Double> mySubTask = Lists.mutable.with(1.0, 2.0);
MutableList<Double> mySubSubTask = Lists.mutable.with(1.0, 2.0);
MutableList<Double> mySubSubSubTask = Lists.mutable.with(1.0, 2.0, 2.0);
MutableList<Double> a = myMainTask.asParallel(executor, 1)
.flatCollect(task -> mySubTask.asParallel(executor,1)
.flatCollect(subTask -> mySubSubTask.asParallel(executor, 1)
.flatCollect(subsubTask -> mySubSubSubTask.asParallel(executor, 1)
.flatCollect(subsubTask -> dummyFunction(task, subTask, subsubTask, subsubTask,executor))
.toList()).toList()).toList()).toList();
System.out.println("pool size: " + ((ThreadPoolExecutor) executor).getPoolSize());
executor.shutdownNow();
}
private MutableList<Double> dummyFunction(double a, double b, double c, double d, ExecutorService ex) {
System.out.println("ThreadId: " + Thread.currentThread().getId());
System.out.println("Active threads size: " + ((ThreadPoolExecutor) ex).getActiveCount());
return Lists.mutable.with(a,b,c,d);
}