У меня есть список из 50-500 задач, связанных с вводом-выводом, каждая из которых может выполняться от 3 до 20 минут. Список вычисляется раньше, и никакие новые задачи не добавляются рекурсивно. Я хочу запустить все это с фиксированным размером пула (скажем, 4). Задачи не зависят от других задач.
Так что я думаю, что это подходит для варианта использования ThreadPoolTaskSupport лучше, чем ForkJoinTaskSupport.
Но ThreadPoolTaskSupport устарела, а ForkJoinTaskSupport является только рекомендуемой альтернативой.
Я пытался использовать ForkJoinTaskSupport, но, похоже, когда оставляются последние 8 или около того задач, он начинает убивать потоки, поэтому последние 3-4 задачи выполняются в одном потоке, добавляя 1 час к общему времени выполнения.
Любой способ исправить это поведение в ForkJoinTaskSupport или я должен использовать ThreadPoolTaskSupport с фиксированным размером, несмотря на то, что он устарел?
Код для проверки этого.
object ThreadLibTest {
def main(arr:Array[String]):Unit = {
val tasks = ListBuffer.empty[() => Unit]
for (i <- 1 to 65) {
tasks += {() => {
Thread.sleep(1000)
// println("finishing " + i + " id " + Thread.currentThread().getName)
}
}
}
val ptasks = tasks.par
val fjp = new ForkJoinPool(6)
ptasks.tasksupport = new ForkJoinTaskSupport(fjp)
ptasks.map(x => {logfjp(fjp); x.apply()})
}
def logfjp(pool: ForkJoinPool) {
println(
" activeThreads=" + pool.getActiveThreadCount() +
" runningThreads=" + pool.getRunningThreadCount() +
" poolSize=" + pool.getPoolSize() +
" queuedTasks=" + pool.getQueuedTaskCount() +
" queuedSubmissions=" + pool.getQueuedSubmissionCount() +
" parallelism=" + pool.getParallelism() +
" stealCount=" + pool.getStealCount());
}
}
Last few outputs.
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=1 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=1 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=4 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=1 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
РЕДАКТИРОВАТЬ: Использовал ThreadPoolExecutor фиксированного размера в качестве taskSupport, снова та же проблема, только 2 потока из 6 выполняли последние несколько задач.
Явная отправка задач в ThreadPoolExecutor дала правильный результат. Все потоки были активны, пока все задачи не были закончены. Еще предстоит выяснить, почему настройка в taskSupport не работает.
Явная отправка задач в объединяющий пул Fork или в пул потоков завершается за 11 секунд, в то время как файл tasks.par с любым из них занимает 17 секунд.
Может быть из-за shouldSplitFurther
, определенного в IterableSplitter
?