Использование фьючерсов и актеров akka для распараллеливания списка - PullRequest
1 голос
/ 12 октября 2011

Я хочу отправить список сообщений актеру, сразу же получить ответ в будущем, а затем дождаться завершения всех будущих операций, прежде чем вернуться к вызывающему методу.После прочтения документов akka я считаю, что Future.sequence - это путь, но я не смог заставить следующий код работать правильно.Я получаю эту ошибку от компилятора:

  found   : akka.dispatch.ActorCompletableFuture
  required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
            futures += secondary ? GetRandom
                                 ^

Я уверен, что упускаю что-то очевидное, но приведенный ниже код кажется «правильным» для примеров и документов API.

import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer

object Commands {
    trait Command

    case class GetRandom() extends Command
    case class GenRandomList() extends Command  
}

class Secondary() extends Actor {
    val randomGenerator = new Random()

    override def receive = {
        case GetRandom() =>
            self reply randomGenerator.nextInt(100)
    }
}

class Primary() extends Actor {
    private val secondary = Actor.actorOf[Secondary]

    override def receive = {

        case GenRandomList() =>

            val futures = new ListBuffer[Future[Integer]]

            for (i <- 0 until 10) {
                futures += secondary ? GetRandom
            }

            val futureWithList = Future.sequence(futures)

            futureWithList.map { foo =>
                println("Shouldn't foo be an integer now? " + foo)
            }.get
    }

    override def preStart() = {
        secondary.start()
    }
}

class Starter extends App {
    println("Starting")
    var master = Actor.actorOf(new Primary()).start()
    master ! GenRandomList()
}

Как правильно отправлять серию сообщений актеру, получать будущее и возвращать его после завершения всех фьючерсов (при желании можно сохранить результаты каждого будущего в списке и вернуть его).

Ответы [ 2 ]

3 голосов
/ 12 октября 2011
(secondary ? GetRandom).mapTo[Int]
3 голосов
/ 12 октября 2011

Akka ? возвращает Future[Any], но вам нужно Future[Int].

Таким образом, вы можете определить список, который принимает все виды фьючерсов:

val futures = new ListBuffer[Future[Any]]

илиприведите результат как Int, как только он станет доступен:

for (i <- 0 until 10) {
  futures += (secondary ? GetRandom) map {
    _.asInstanceOf[Int]
  }
}

Кстати, чтобы заставить его работать, вам нужно изменить GetRandom определение:

case object GetRandom extends Command

исопоставьте это с:

case GetRandom =>
...