Параллельная карта / foreach в Scala - PullRequest
16 голосов
/ 18 ноября 2009

У меня есть итерация vals: Iterable[T] и долгосрочная функция без каких-либо соответствующих побочных эффектов: f: (T => Unit). Прямо сейчас это применяется к vals очевидным образом:

vals.foreach(f)

Я бы хотел, чтобы звонки на f выполнялись одновременно (в разумных пределах). Есть ли очевидная функция где-то в базовой библиотеке Scala? Что-то вроде:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

Несмотря на то, что f работает достаточно долго, он достаточно короткий, поэтому я не хочу, чтобы накладные расходы вызывали поток для каждого вызова, поэтому я ищу что-то, основанное на пуле потоков.

Ответы [ 7 ]

16 голосов
/ 01 октября 2014

Во многих ответах 2009 года все еще используется старый scala.actors.Futures._, которого больше нет в новой версии Scala. В то время как Akka является предпочтительным способом, гораздо более читаемый способ - просто использовать параллельные ( .par ) коллекции:

vals.foreach { v => f(v) }

становится

vals.par.foreach { v => f(v) }

В качестве альтернативы, использование parMap может показаться более лаконичным, хотя и с оговоркой, которую вы должны помнить, чтобы импортировать обычный Scalaz *. Как обычно, в Scala есть несколько способов сделать то же самое!

13 голосов
/ 19 ноября 2009

Скалаз имеет parMap. Вы бы использовали его следующим образом:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

Это обеспечит каждый функтор (включая Iterable) методом parMap, так что вы можете просто сделать:

vals.parMap(f)

Вы также получаете parFlatMap, parZipWith и т. Д.

10 голосов
/ 18 ноября 2009

Мне нравится ответ Futures. Однако, хотя он будет выполняться одновременно, он также будет возвращаться асинхронно, что, вероятно, не то, что вам нужно. Правильный подход будет следующим:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }
3 голосов
/ 18 ноября 2009

У меня были некоторые проблемы с использованием scala.actors.Futures в Scala 2.8 (он глючил, когда я проверял). Использование java-библиотек напрямую для меня сработало:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}
2 голосов
/ 20 ноября 2009

Последний выпуск Функциональная Java имеет некоторые функции параллелизма высшего порядка, которые вы можете использовать.

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

А потом ...

par.parMap(vals, f)

Не забудьте shutdown pool.

2 голосов
/ 18 ноября 2009

Я бы использовал scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t)))
0 голосов
/ 14 мая 2014

Вы можете использовать Parallel Collections из стандартной библиотеки Scala. Они как обычные коллекции, но их операции выполняются параллельно. Вам просто нужно сделать вызов par, прежде чем вызывать операцию с некоторыми коллекциями.

import scala.collection._

val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString
...