Использование ThreadPoolTaskSupport в качестве поддержки задач для параллельных коллекций в Scala - PullRequest
0 голосов
/ 16 апреля 2019

У меня есть список из 50-500 задач, связанных с вводом-выводом, каждая из которых может выполняться от 3 до 20 минут. Список вычисляется раньше, и никакие новые задачи не добавляются рекурсивно. Я хочу запустить все это с фиксированным размером пула (скажем, 4). Задачи не зависят от других задач.

Так что я думаю, что это подходит для варианта использования ThreadPoolTaskSupport лучше, чем ForkJoinTaskSupport. Но ThreadPoolTaskSupport устарела, а ForkJoinTaskSupport является только рекомендуемой альтернативой.

Я пытался использовать ForkJoinTaskSupport, но, похоже, когда оставляются последние 8 или около того задач, он начинает убивать потоки, поэтому последние 3-4 задачи выполняются в одном потоке, добавляя 1 час к общему времени выполнения.

Любой способ исправить это поведение в ForkJoinTaskSupport или я должен использовать ThreadPoolTaskSupport с фиксированным размером, несмотря на то, что он устарел?

Код для проверки этого.

object ThreadLibTest  {
  def main(arr:Array[String]):Unit = {
    val tasks = ListBuffer.empty[() => Unit]
    for (i <- 1 to 65) {
      tasks += {() => {
          Thread.sleep(1000)
//          println("finishing " + i + " id " + Thread.currentThread().getName)
      }
      }
    }

    val ptasks = tasks.par
    val fjp = new ForkJoinPool(6)
    ptasks.tasksupport = new ForkJoinTaskSupport(fjp)
    ptasks.map(x => {logfjp(fjp); x.apply()})

  }

  def logfjp(pool: ForkJoinPool) {
    println(
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());    
  }

}
Last few outputs.
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=1 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=1 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=4 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=1 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3

РЕДАКТИРОВАТЬ: Использовал ThreadPoolExecutor фиксированного размера в качестве taskSupport, снова та же проблема, только 2 потока из 6 выполняли последние несколько задач.

Явная отправка задач в ThreadPoolExecutor дала правильный результат. Все потоки были активны, пока все задачи не были закончены. Еще предстоит выяснить, почему настройка в taskSupport не работает. Явная отправка задач в объединяющий пул Fork или в пул потоков завершается за 11 секунд, в то время как файл tasks.par с любым из них занимает 17 секунд. Может быть из-за shouldSplitFurther, определенного в IterableSplitter?

...