scala параллельные коллекции степень параллелизма - PullRequest
39 голосов
/ 24 марта 2011

Есть ли эквивалент в параллельных коллекциях scala к withDegreeOfParallelism LINQ, который устанавливает количество потоков, которые будут выполнять запрос? Я хочу запустить параллельную операцию, для которой требуется заданное количество потоков.

Ответы [ 2 ]

59 голосов
/ 24 марта 2011

В самой новой соединительной линии, используя JVM 1.6 или новее, используйте:

collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(parlevel: Int)

Впрочем, это может измениться в будущем. В следующих выпусках планируется более унифицированный подход к настройке всех параллельных API-интерфейсов задач Scala.

Обратите внимание, однако, что, хотя это будет определять количество процессоров, используемых запросом, это может быть не фактическое количество потоков, участвующих в выполнении запроса. Поскольку параллельные коллекции поддерживают вложенный параллелизм, фактическая реализация пула потоков может выделить больше потоков для выполнения запроса, если обнаружит, что это необходимо.

EDIT:

Начиная с Scala 2.10, предпочтительным способом установки уровня параллелизма является установка поля tasksupport для нового объекта TaskSupport, как в следующем примере:

scala> import scala.collection.parallel._
import scala.collection.parallel._

scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)

scala> pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4a5d484a

scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

При создании экземпляра объекта ForkJoinTaskSupport с помощью пула соединений с вилками уровень параллелизма пула соединений с вилками должен быть установлен на нужное значение (2 в примере).

5 голосов
/ 23 мая 2012

Независимо от версии JVM, в Scala 2.9+ (введены параллельные коллекции) вы также можете использовать комбинацию функций grouped(Int) и par для выполнения параллельных заданий на небольших кусках, например:

scala> val c = 1 to 5
c: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)

scala> c.grouped(2).seq.flatMap(_.par.map(_ * 2)).toList
res11: List[Int] = List(2, 4, 6, 8, 10)

grouped(2) создает чанки длиной 2 или меньше, seq гарантирует, что набор чанков не параллельный (бесполезный в этом примере), затем функция _ * 2 выполняется для небольших параллельных чанков (созданныхс par), таким образом гарантируя, что максимум 2 потока выполняются параллельно.

Это может быть, однако, несколько менее эффективным, чем установка параметра рабочего пула, я не уверен в этом.

...