Мне нужно создавать переменное количество актеров каждый раз, когда запускается моя программа, а затем должен гарантировать, что все ответы возвращаются через определенный промежуток времени.Эта ссылка дает хорошее представление о фиксированном количестве актеров, но как насчет динамического числа?
Это мой код, который создает актера и передает ему сообщения:
ruleList = ...
val childActorList: Iterable[ActorRef] = ruleList.map(ruleItem =>
context.actorOf(DbActor.props(ruleItem.parameter1, ruleItem.parameter2)))
implicit val timeout = Timeout(10.second)
childActorList.foreach(childActor =>
childActor ? (tempTableName, lastDate)
)
Обновлено-1
Согласно руководствам @Raman Mishra, я обновил свой код, как показано ниже, это код в родительском актере:
override val supervisorStrategy: SupervisorStrategy = {
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
case exp: SQLException => //Resume;
throw exp
case exp:AskTimeoutException => throw exp
case other: Exception => throw other
}
}
override def receive: Receive = {
case Start(tempTableName, lastDate) => {
implicit val timeout = Timeout(10.second)
ruleList.foreach { ruleItem =>
val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
ask(childActor, (tempTableName, lastDate)).mapTo[Seq[Int]]
onComplete {
lastDate)).mapTo[Seq[Int]] onComplete {
case util.Success(res) => println("done" + res + ruleItem._2)
case util.Failure(exp: AskTimeoutException) => println("Failed query:" + ruleItem._2); throw exp
case other => println(other)
}
}
И в дочернемactor:
case (brokerTableName, lastDate) => {
Logger("Started query by actor" + self.path.name + ':' +
val repo = new Db()
val res = repo.getAggResult(query = (brokerTableName, lastDate))
val resWrapper = res match {
case elem: Future[Any] => elem
case elem:Any => Future(elem)
}
resWrapper pipeTo self
}
case res:List[Map[Any, Any]] => {
// here final result is send to parent actor
repo.insertAggresults(res, aggTableName) pipeTo context.parent
}
Теперь, когда я запускаю основное приложение, сначала запускается родительский актер, который создает дочерних акторов и отправляет им сообщения, используя метод ask.Дочерние актеры выполняют свои задачи, но проблема здесь в том, что ответ дочерних актеров никогда не возвращается обратно родительскому актору, и при каждом запуске приложения происходит AskTimeoutException
.Я сомневаюсь, что использование метода onComplete
является правильным или нет.Мы будем благодарны за любую помощь.
"Обновлено-2"
Я обнаружил, что проблема в использовании context.parent
вместо sender ().Кроме того, когда я передаю трубку отправителю, первую часть моего результата, и отправитель запрашивает вторую часть, проблема решается, но я не знаю, что здесь происходит, почему я не могу передать себе и вернуть окончательный результатparent?
Это последний код:
В родительском акторе:
override def receive: Receive = {
case Start(tempTableName, lastDate) => {
println("started: called by remote actor")
implicit val timeout = Timeout(5 second)
ruleList.foreach { ruleItem =>
val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
ask(childActor, Broker(tempTableName, lastDate)) onComplete {
// (childActor ? Broker(tempTableName, lastDate)).mapTo[Seq[Int]] onComplete {
case util.Success(res: List[Map[Any, Any]]) => (childActor ? res) onComplete {
case util.Success(res: Seq[Any]) => println("Successfull- Num,ber of documents:" + res.length + " " + ruleItem._2)
case util.Failure(exp: AskTimeoutException) => println("Failed for writing - query:" + ruleItem._2); throw exp
}
case util.Failure(exp: AskTimeoutException) => println("Failed for reading - query :" + ruleItem._2); throw exp
case other => println(other)
}
}
}
}
В дочернем актере:
case (brokerTableName, lastDate) => {
Logger("Started query by actor" + self.path.name + ':' +
val repo = new Db()
val res = repo.getAggResult(query = (brokerTableName, lastDate))
val resWrapper = res match {
case elem: Future[Any] => elem
case elem:Any => Future(elem)
}
resWrapper pipeTo sender()
}
case res:List[Map[Any, Any]] => {
// here final result is send to parent actor
repo.insertAggresults(res, aggTableName) pipeTo sender()
}