Как переписать цикл for с общей зависимостью, используя актеры - PullRequest
5 голосов
/ 22 марта 2010

У нас есть код, который должен работать быстрее. Он уже профилирован, поэтому мы хотели бы использовать несколько потоков. Обычно я настраиваю очередь в памяти, и у меня есть несколько потоков, берущих задания из очереди и вычисляющих результаты. Для общих данных я бы использовал ConcurrentHashMap или аналогичный.

Я действительно не хочу идти по этому пути снова. Из того, что я прочитал, использование акторов приведет к более чистому коду, и если я использую akka, переход на более чем 1 jvm должен быть проще. Это правда?

Однако я не знаю, как мыслить в актерах, поэтому я не уверен, с чего начать.

Чтобы лучше понять проблему, вот пример кода:

case class Trade(price:Double, volume:Int, stock:String) {
  def value(priceCalculator:PriceCalculator) =
    (priceCalculator.priceFor(stock)-> price)*volume
}
class PriceCalculator {
  def priceFor(stock:String) = {
    Thread.sleep(20)//a slow operation which can be cached
    50.0
  }
}
object ValueTrades {

  def valueAll(trades:List[Trade],
      priceCalculator:PriceCalculator):List[(Trade,Double)] = {
    trades.map { trade => (trade,trade.value(priceCalculator)) }
  }

  def main(args:Array[String]) {
    val trades = List(
      Trade(30.5, 10, "Foo"),
      Trade(30.5, 20, "Foo")
      //usually much longer
    )
    val priceCalculator = new PriceCalculator
    val values = valueAll(trades, priceCalculator)
  }

}

Я был бы признателен, если бы кто-то с опытом использования актеров мог предложить, как это отобразится на актерах.

Ответы [ 2 ]

3 голосов
/ 23 марта 2010

Это дополнение к моему комментарию к общим результатам для дорогих вычислений. Вот оно:

import scala.actors._
import Actor._
import Futures._

case class PriceFor(stock: String) // Ask for result

// The following could be an "object" as well, if it's supposed to be singleton
class PriceCalculator extends Actor {
  val map = new scala.collection.mutable.HashMap[String, Future[Double]]()
  def act = loop {
    react {
      case PriceFor(stock) => reply(map getOrElseUpdate (stock, future {
        Thread.sleep(2000) // a slow operation
        50.0
      }))
    }
  }
}

Вот пример использования:

scala> val pc = new PriceCalculator; pc.start
pc: PriceCalculator = PriceCalculator@141fe06

scala> class Test(stock: String) extends Actor {
     |   def act = {
     |     println(System.currentTimeMillis().toString+": Asking for stock "+stock)
     |     val f = (pc !? PriceFor(stock)).asInstanceOf[Future[Double]]
     |     println(System.currentTimeMillis().toString+": Got the future back")
     |     val res = f.apply() // this blocks until the result is ready
     |     println(System.currentTimeMillis().toString+": Value: "+res)
     |   }
     | }
defined class Test

scala> List("abc", "def", "abc").map(new Test(_)).map(_.start)
1269310737461: Asking for stock abc
res37: List[scala.actors.Actor] = List(Test@6d888e, Test@1203c7f, Test@163d118)
1269310737461: Asking for stock abc
1269310737461: Asking for stock def
1269310737464: Got the future back

scala> 1269310737462: Got the future back
1269310737465: Got the future back
1269310739462: Value: 50.0
1269310739462: Value: 50.0
1269310739465: Value: 50.0


scala> new Test("abc").start // Should return instantly
1269310755364: Asking for stock abc
res38: scala.actors.Actor = Test@15b5b68
1269310755365: Got the future back

scala> 1269310755367: Value: 50.0
2 голосов
/ 22 марта 2010

Для простого распараллеливания, когда я отбрасываю кучу работы для обработки, а затем жду, пока все это вернется, я склонен использовать шаблон Futures.

class ActorExample {
  import actors._
  import Actor._
  class Worker(val id: Int) extends Actor {
    def busywork(i0: Int, i1: Int) = {
      var sum,i = i0
      while (i < i1) {
        i += 1
        sum += 42*i
      }
      sum
    }
    def act() { loop { react {
      case (i0:Int,i1:Int) => sender ! busywork(i0,i1)
      case None => exit()
    }}}
  }

  val workforce = (1 to 4).map(i => new Worker(i)).toList

  def parallelFourSums = {
    workforce.foreach(_.start())
    val futures = workforce.map(w => w !! ((w.id,1000000000)) );
    val computed = futures.map(f => f() match {
      case i:Int => i
      case _ => throw new IllegalArgumentException("I wanted an int!")
    })
    workforce.foreach(_ ! None)
    computed
  }

  def serialFourSums = {
    val solo = workforce.head
    workforce.map(w => solo.busywork(w.id,1000000000))
  }

  def timed(f: => List[Int]) = {
    val t0 = System.nanoTime
    val result = f
    val t1 = System.nanoTime
    (result, t1-t0)
  }

  def go {
    val serial = timed( serialFourSums )
    val parallel = timed( parallelFourSums )
    println("Serial result:  " + serial._1)
    println("Parallel result:" + parallel._1)
    printf("Serial took   %.3f seconds\n",serial._2*1e-9)
    printf("Parallel took %.3f seconds\n",parallel._2*1e-9)
  }
}

По сути, идеясоздать коллекцию работников - по одному на рабочую нагрузку - и затем выбросить в них все данные с помощью !!который немедленно возвращает будущее.Когда вы пытаетесь прочитать будущее, отправитель блокируется до тех пор, пока работник фактически не завершит работу с данными.

Вы можете переписать вышеприведенное, чтобы вместо этого PriceCalculator extended Actor, а valueAll координировали возвратданные.

Обратите внимание, что вы должны быть осторожны, передавая неизменяемые данные.

В любом случае, на машине, с которой я это печатаю, если вы запустите приведенное выше, вы получите:

scala> (new ActorExample).go
Serial result:  List(-1629056553, -1629056636, -1629056761, -1629056928)
Parallel result:List(-1629056553, -1629056636, -1629056761, -1629056928)
Serial took   1.532 seconds
Parallel took 0.443 seconds

(Очевидно, у меня есть по крайней мере четыре ядра; параллельная синхронизация довольно немного варьируется в зависимости от того, какой работник получает какой процессор и что еще происходит на машине.)

...