Можно ли использовать «реакцию», чтобы дождаться завершения ряда дочерних актеров, а затем продолжить? - PullRequest
2 голосов
/ 29 марта 2012

Я все время пытаюсь заставить это работать. Новичок в scala и для актеров, поэтому может непреднамеренно принимать плохие дизайнерские решения - пожалуйста, скажите мне, если так.

Установка такова:

У меня есть управляющий актер, который содержит несколько рабочих актеров. Каждый работник представляет расчет, который для данного входа будет выплевывать 1..n выходов. Контроллер должен отключить каждого работника, собрать возвращенные результаты, затем продолжить и сделать еще кучу вещей, как только это будет завершено. Вот как я подошел к этому, используя receive в актере контроллера:

class WorkerActor extends Actor {
  def act() {
    loop {
      react {
        case DoJob =>
          for (1 to n) sender ! Result
          sender ! Done
      }
    }
  }
}

Рабочий актер достаточно прост - он выдает результаты до тех пор, пока не выполнит его, когда отправляет обратно сообщение Done.

class ControllerActor(val workers: List[WorkerActor]) extends Actor {
  def act() {
     workers.foreach(w => w ! DoJob)
     receiveResults(workers.size)

     //do a bunch of other stuff
  }

  def receiveResults(count: Int) {
    if (count == 0) return

    receive {
      case Result => 
        // do something with this result (that updates own mutable state)
        receiveResults(count)
      case Done 
        receiveResults(count - 1)
    }
  }
}

Актер-контролер запускает каждого из рабочих, затем рекурсивно принимает вызовы до тех пор, пока не получит сообщение Done для каждого из рабочих.

Это работает, но мне нужно создать множество актеров контроллера, поэтому receive слишком тяжелый - мне нужно заменить его на react.

Однако, когда я использую react, закулисное исключение срабатывает после обработки окончательного сообщения Done, и метод act субъекта контроллера замыкается накоротко, поэтому ни один из "//do a bunch of other stuff ", которое приходит после того, как происходит.

Я могу заставить что-то произойти после последнего Done сообщения, используя andThen { } - но мне действительно нужно выполнить несколько наборов вычислений таким образом, чтобы в итоге получилась смешно вложенная структура andThen { andThen { andThen } } с.

Я также хочу скрыть эту сложность в методе, который затем будет перемещен в отдельную черту, так что актер-контроллер с несколькими списками рабочих актеров может просто выглядеть примерно так:

class ControllerActor extends Actor with CalculatingTrait { 
//CalculatingTrait has performCalculations method

    val listOne: List[WorkerActor]
    val ListTwo: List[WorkerActor]   
    def act {
      performCalculations(listOne)
      performCalculations(listTwo)
    }
}

Так есть ли способ остановить короткое замыкание метода act в методе executeCalculations? Есть ли лучший дизайнерский подход, который я мог бы использовать?

Ответы [ 3 ]

2 голосов
/ 30 марта 2012

Вы можете избежать react / receive полностью, используя актера АккиВот как может выглядеть ваша реализация:

import akka.actor._

class WorkerActor extends Actor {
  def receive = {
    case DoJob =>
      for (_ <- 1 to n) sender ! Result
      sender ! Done
  }
}

class ControllerActor(workers: List[ActorRef]) extends Actor {
  private[this] var countdown = workers.size

  override def preStart() {
    workers.foreach(_ ! DoJob)
  }

  def receive = {
    case Result =>
      // do something with this result
    case Done =>
      countdown -= 1
      if (countdown == 0) {
        // do a bunch of other stuff

        // It looks like your controllers die when the workers
        // are done, so I'll do the same.
        self ! PoisonPill
      }
  }
}
1 голос
/ 29 марта 2012

РЕДАКТИРОВАТЬ: только что читали об актерах Akka и обнаружили, что они "гарантируют порядок сообщений на основе отправителя". Поэтому я обновил свой пример таким образом, что, если контроллеру нужно было позже запросить у получателя вычисленное значение и необходимо убедиться, что оно было завершено, он мог бы сделать это с гарантией порядка сообщений только для каждого отправителя (например, все еще актеры скала, а не акка).

Наконец, с небольшой помощью из ответа @ Destin меня поразило, что я мог бы сделать это намного проще, отделив часть контроллера, отвечающую за отбрасывание рабочих, от части, ответственной за принятие и использование результатов. , Полагаю, принцип единой ответственности ... Вот что я сделал (разделив первоначальный управляющий субъект на управляющий класс и актер-получатель):

case class DoJob(receiever: Actor)
case object Result
case object JobComplete
case object Acknowledged
case object Done

class Worker extends Actor {
  def act {
    loop {
      react {
        case DoJob(receiver) => 
          receiver ! Result
          receiver ! Result
          receiver !? JobComplete match {
            case Acknowledged =>
              sender ! Done
          }
      }
    }
  }
}

class Receiver extends Actor {
  def act {
    loop {
      react {
        case Result => println("Got result!")
        case JobComplete => sender ! Acknowledged
      }
    }
  }
}

class Controller {
  val receiver = new Receiver
  val workers = List(new Worker, new Worker, new Worker)

  receiver.start()
  workers.foreach(_.start())

  workers.map(_ !! DoJob(receiver)).map(_())

  println("All the jobs have been done")
}
1 голос
/ 29 марта 2012

Вот как я мог бы подойти к этому (таким образом, это кажется больше комментариев и шаблонов, чем фактическое содержание):

class WorkerController(val workerCriteria: List[WorkerCriteria]) {

  // The actors that only _I_ interact with are probably no one else's business
  // Your call, though
  val workers = generateWorkers(workerCriteria)

  // No need for an `act` method--no need for this to even be an actor

  /* Will send `DoJob` to each actor, expecting a reply from each.
   * Could also use the `!!` operator (instead of `!?`) if you wanted
   * them to return futures (so WorkerController could continue doing other
   * things while the results compute).  The futures could then be evaluated
   * with `results map (_())`, which will _then_ halt execution to wait for each
   * future that isn't already computed (if any).
   */
  val results = workers map (_ !? DoJob)
  //do a bunch of other stuff with your results

  def generateWorkers(criteria: List[WorkerCriteria]) = // Create some workers!

}

class Worker extends Actor {
  def act() {
    loop {
      react {
        case DoJob =>
          // Will generate a result and send it back to the caller
          reply(generateResult)
      }
    }
  }
  def generateResult = // Result?
}
...