Можно ли использовать блокирующие сообщения актеров, когда они будут упакованы в будущем? - PullRequest
6 голосов
/ 13 июля 2011

Мое текущее приложение основано на akka 1.1. Он имеет несколько ProjectAnalysisActors, каждый из которых отвечает за обработку задач анализа для конкретного проекта. Анализ начинается, когда такой субъект получает общее стартовое сообщение. После завершения одного шага он отправляет себе сообщение со следующим шагом, пока он определен. Выполнение кода в основном выглядит следующим образом


sealed trait AnalysisEvent {
   def run(project: Project): Future[Any]
   def nextStep: AnalysisEvent = null
}

case class StartAnalysis() extends AnalysisEvent {
   override def run ...
   override def nextStep: AnalysisEvent = new FirstStep
}

case class FirstStep() extends AnalysisEvent {
   override def run ...
   override def nextStep: AnalysisEvent = new SecondStep
}

case class SecondStep() extends AnalysisEvent {
   ...
}

class ProjectAnalysisActor(project: Project) extends Actor {

    def receive = {
        case event: AnalysisEvent =>
            val future = event.run(project)
            future.onComplete { f =>
                self ! event.nextStep
            }
    }

}

У меня есть некоторые трудности, связанные с реализацией моего кода для методов выполнения для каждого шага анализа. На данный момент я создаю новое будущее в каждом методе выполнения. Внутри этого будущего я отправляю все последующие сообщения в разные подсистемы. Некоторые из них являются неблокирующими сообщениями «забей и забудь», но некоторые из них возвращают результат, который должен быть сохранен до запуска следующего шага анализа.

На данный момент типичный метод запуска выглядит следующим образом


def run(project: Project): Future[Any] = {
    Future {
        progressActor ! typicalFireAndForget(project.name)
        val calcResult = (calcActor1 !! doCalcMessage(project)).getOrElse(...)

        val p: Project = ... // created updated project using calcResult

        val result = (storage !! updateProjectInformation(p)).getOrElse(...)
        result
    }
}

Поскольку этих блокирующих сообщений следует избегать, мне интересно, если это правильный путь. Имеет ли смысл использовать их в этом случае или мне все же следует избегать этого? Если это так, что будет правильным решением?

1 Ответ

7 голосов
/ 13 июля 2011

Очевидно, единственная цель ProjectAnalysisActor состоит в том, чтобы связывать будущие вызовы. Во-вторых, методы run также ожидают результатов, чтобы продолжить вычисления.

Так что я думаю, что вы можете попробовать рефакторинг своего кода для использования Future Composition , как объяснено здесь: http://akka.io/docs/akka/1.1/scala/futures.html

def run(project: Project): Future[Any] = {
  progressActor ! typicalFireAndForget(project.name)
  for( 
      calcResult <- calcActor1 !!! doCalcMessage(project);
      p = ... // created updated project using calcResult
      result <- storage !!! updateProjectInformation(p)
  ) yield (
    result
  )
}
...