Несколько актеров Скала, обслуживающих одну задачу - PullRequest
2 голосов
/ 24 марта 2011

Мне нужно обрабатывать несколько значений данных параллельно («SIMD»).Я могу использовать API java.util.concurrent (Executors.newFixedThreadPool()) для обработки нескольких значений в параллелях с использованием Future экземпляров:

import java.util.concurrent.{Executors, Callable}

class ExecutorsTest {
  private class Process(value: Int)
      extends Callable[Int] {
    def call(): Int = {
      // Do some time-consuming task
      value
    }
  }

  val executorService = {
    val threads = Runtime.getRuntime.availableProcessors
    Executors.newFixedThreadPool(threads)
  }

  val processes = for (process <- 1 to 1000) yield new Process(process)

  val futures = executorService.invokeAll(processes)

  // Wait for futures
}

Как мне сделать то же самое, используя Actors?Я не верю, что хочу «подать» все процессы одному актору, потому что тогда актер будет выполнять их последовательно.

Нужно ли создавать несколько акторов-процессоров с актером-диспетчером?который отправляет равное количество процессов каждому «процессору» актера?

Ответы [ 2 ]

10 голосов
/ 24 марта 2011

Если вам нужна только обработка по принципу "забыл и забыл", почему бы не использовать фьючерсы Scala?

import scala.actors.Futures._
def example = {
  val answers = (1 to 4).map(x => future {
    Thread.sleep(x*1000)
    println("Slept for "+x)
    x
  })
  val t0 = System.nanoTime
  awaitAll(1000000,answers: _*)  // Number is timeout in ms
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  answers.map(_()).sum
}

scala> example
Slept for 1
Slept for 2
Slept for 3
Slept for 4
4.000 seconds elapsed
res1: Int = 10

По сути, все, что вам нужно, это поместить нужный код в блок future { }, и он будетнемедленно вернуть будущее;примените его, чтобы получить ответ (он будет заблокирован, пока не будет выполнено), или используйте awaitAll с таймаутом для ожидания, пока все не закончат.


Обновление: с 2.11 способ сделать этос scala.concurrent.Future.Перевод приведенного выше кода:

import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

def example = {
  val answers = Future.sequence(
    (1 to 4).map(x => Future {
      Thread.sleep(x*1000)
      println("Slept for "+x)
      x
    })
  )
  val t0 = System.nanoTime
  val completed = Await.result(answers, Duration(1000, SECONDS))
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  completed.sum
}
3 голосов
/ 24 марта 2011

Если вы можете использовать Akka, взгляните на поддержку ActorPool: http://doc.akka.io/routing-scala

Позволяет вам указать параметры того, сколько актеров вы хотите запустить параллельно, а затем отправлять работу этим акторам.

...