Какие возможности улучшить производительность / параллелизм в следующем коде Scala + Akka? - PullRequest
0 голосов
/ 29 февраля 2012

Я ищу возможности для повышения параллелизма и производительности в моем коде Scala 2.9 / Akka 2.0 RC2. Учитывая следующий код:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

Когда я запускаю это, вывод (в микросекундах) равен

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

Вы видите, как включается инкрементальный компилятор JVM.

Я думал, что одна быстрая победа может измениться

    functionsToCompute.foreach(function => {

до

    functionsToCompute.par.foreach(function => {

но это приводит к следующим истекшим временам

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

Некоторая информация:

1) Я запускаю это на Macbook Pro с 2 ядрами.

2) В полной версии функции - это длительные операции, которые зацикливаются на части изменяемого общего буфера. Похоже, это не проблема, поскольку получение сообщений из почтового ящика актера управляет потоком, но я подозреваю, что это может быть проблемой с повышенным уровнем параллелизма Вот почему я перешел на IndexedSeq.

3) В полной версии список functionsToCompute может различаться, так что не все элементы в functionMap обязательно вызываются (т.е.). FunctionMap.size может быть намного больше, чем functionsToCompute.size

4) Функции могут быть вычислены параллельно, но результирующее отображение должно быть завершено до возврата

Некоторые вопросы:

1) Что я могу сделать, чтобы параллельная версия работала быстрее?

2) Где имеет смысл добавлять неблокирующие и блокирующие фьючерсы?

3) Где имеет смысл пересылать вычисления другому актеру?

4) Какие существуют возможности для повышения неизменности / безопасности?

Спасибо, Bruce

1 Ответ

2 голосов
/ 01 марта 2012

Предоставление примера по запросу (извините за задержку ... У меня нет уведомлений для SO).

В документации Akka есть отличный пример Раздел «Составление фьючерсов» , но я дам вам кое-что более приспособленное к вашей ситуации.

Теперь, прочитав это, пожалуйста, уделите некоторое время чтению руководств и документов на веб-сайте Akka. Вам не хватает ключевой информации, которую вам предоставят эти документы.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}

Все, что вам нужно в вашем classpath для запуска - это баночка akka-actor. Сайт Akka расскажет вам, как настроить то, что вам нужно, но на самом деле все просто.

...