akka - как обеспечить, чтобы все ответы динамического числа актеров возвращались родительскому актеру? - PullRequest
0 голосов
/ 13 ноября 2018

Мне нужно создавать переменное количество актеров каждый раз, когда запускается моя программа, а затем должен гарантировать, что все ответы возвращаются через определенный промежуток времени.Эта ссылка дает хорошее представление о фиксированном количестве актеров, но как насчет динамического числа?

Это мой код, который создает актера и передает ему сообщения:

   ruleList = ...
   val childActorList: Iterable[ActorRef] = ruleList.map(ruleItem =>
    context.actorOf(DbActor.props(ruleItem.parameter1, ruleItem.parameter2)))

  implicit val timeout = Timeout(10.second)
  childActorList.foreach(childActor =>
    childActor ? (tempTableName, lastDate)
  )

Обновлено-1

Согласно руководствам @Raman Mishra, я обновил свой код, как показано ниже, это код в родительском актере:

override val supervisorStrategy: SupervisorStrategy = {
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
  case exp: SQLException => //Resume;
   throw exp
  case exp:AskTimeoutException =>  throw exp
  case other: Exception => throw other
 }
}

override def receive: Receive = {

case Start(tempTableName, lastDate) => {

implicit val timeout = Timeout(10.second)
ruleList.foreach { ruleItem =>
    val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
    ask(childActor, (tempTableName, lastDate)).mapTo[Seq[Int]] 
  onComplete {
  lastDate)).mapTo[Seq[Int]] onComplete {
      case util.Success(res) => println("done" + res + ruleItem._2)
      case util.Failure(exp: AskTimeoutException) => println("Failed query:" + ruleItem._2); throw exp
      case other => println(other)
    }
  }

И в дочернемactor:

  case (brokerTableName, lastDate) => {
    Logger("Started query by actor" + self.path.name + ':' + 
  val repo = new Db()
  val res = repo.getAggResult(query = (brokerTableName, lastDate))

  val resWrapper = res match {
    case elem: Future[Any] => elem
    case elem:Any => Future(elem)
  }
  resWrapper pipeTo self
}
case res:List[Map[Any, Any]] => {
  // here final result is send to parent actor
  repo.insertAggresults(res, aggTableName) pipeTo context.parent
}

Теперь, когда я запускаю основное приложение, сначала запускается родительский актер, который создает дочерних акторов и отправляет им сообщения, используя метод ask.Дочерние актеры выполняют свои задачи, но проблема здесь в том, что ответ дочерних актеров никогда не возвращается обратно родительскому актору, и при каждом запуске приложения происходит AskTimeoutException.Я сомневаюсь, что использование метода onComplete является правильным или нет.Мы будем благодарны за любую помощь.

"Обновлено-2"

Я обнаружил, что проблема в использовании context.parent вместо sender ().Кроме того, когда я передаю трубку отправителю, первую часть моего результата, и отправитель запрашивает вторую часть, проблема решается, но я не знаю, что здесь происходит, почему я не могу передать себе и вернуть окончательный результатparent?

Это последний код:

В родительском акторе:

    override def receive: Receive = {

case Start(tempTableName, lastDate) => {
  println("started: called by remote actor")
  implicit val timeout = Timeout(5 second)
  ruleList.foreach { ruleItem =>
    val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
    ask(childActor, Broker(tempTableName, lastDate)) onComplete {
      //        (childActor ? Broker(tempTableName, lastDate)).mapTo[Seq[Int]] onComplete {
      case util.Success(res: List[Map[Any, Any]]) => (childActor ? res) onComplete {
        case util.Success(res: Seq[Any]) => println("Successfull- Num,ber of documents:" + res.length + " " + ruleItem._2)
        case util.Failure(exp: AskTimeoutException) => println("Failed for writing - query:" + ruleItem._2); throw exp
      }
      case util.Failure(exp: AskTimeoutException) => println("Failed for reading - query :" + ruleItem._2); throw exp
      case other => println(other)
     }
   }

 }

}

В дочернем актере:

  case (brokerTableName, lastDate) => {
    Logger("Started query by actor" + self.path.name + ':' + 
  val repo = new Db()
  val res = repo.getAggResult(query = (brokerTableName, lastDate))

  val resWrapper = res match {
    case elem: Future[Any] => elem
    case elem:Any => Future(elem)
  }
  resWrapper pipeTo sender()
}
case res:List[Map[Any, Any]] => {
  // here final result is send to parent actor
  repo.insertAggresults(res, aggTableName) pipeTo sender()
}

1 Ответ

0 голосов
/ 21 ноября 2018

Причина, по которой ответ на sender() работает там, где не отвечает на context.parent, заключается в том, что запрос создает временного субъекта для обработки ответа.Вам нужно ответить этому временному действующему лицу: отправителю (который отличается от родителя).

Также не ясно, блокирует ли метод getAggResult.Если это так, это не поможет (см. здесь ).

...