Имеет ли Akka Decider доступ к полному сценарию отказа? - PullRequest
0 голосов
/ 15 мая 2018

Новый для Акки.Создание нового класса Scala, расширяющего SupervisorStrategy, дает мне следующий шаблон для работы:

class MySupervisorStrategy extends SupervisorStrategy {
  override def decider: Decider = ???

  override def handleChildTerminated(context: ActorContext, child: ActorRef,
    children: Iterable[ActorRef]): Unit = ???

  override def processFailure(context: ActorContext, restart: Boolean,
    child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = ???
}

Я ищу способ доступа:

  1. Throwable / Exception, которое было выброшено из дочернего актера
  2. Дочерний актер ActorRef, выдавший исключение
  3. Сообщение, которое было передано дочернему актору, который вызвал исключение для выброса

Я думаю, Decider (который на самом деле PartialFunction[Throwable,Directive]) получает Throwable всякий раз, когда ребенок выдает исключение, но я не вижу, гдеЯ мог бы получить доступ к № 2 и № 3 из моего списка выше. Есть идеи?


Обновление

Из опубликованной скрипки выглядит как действительный Decider:

{
    case ActorException(ref,t,"stop")      =>
      println(s"Received 'stop' from ${ref}")
      Stop
    case ActorException(ref,t,"restart")      =>
      println(s"Received 'restart' from ${ref}")
      Restart
    case ActorException(ref,t,"resume")      =>
      println(s"Received 'resume' from ${ref}")
      Resume
}

вышеЯ вижу все три:

  1. Исключение, которое было сгенерировано дочерним элементом
  2. Дочерний объект (ref), выдавший исключение
  3. Сообщение, которое былопервоначально отправлено ребенку (что вызвало исключение)

Похоже, что в этом Decider нет ничего, что нуждается в , который будет определен внутри этого Supervisor класса,Я хотел бы вывести логику Decider, скажем, в MyDecider.scala и найти способ реорганизовать Supervisor, чтобы его supervisorStrategy использовал экземпляр MyDecider, так что, возможно, что-то похожее на:

class Supervisor extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  var child: ActorRef = _

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute, decider = myDecider)

  ...
}

1 Ответ

0 голосов
/ 17 мая 2018

Для # 2 вы можете получить доступ к отправителю ", если стратегия объявлена ​​внутри контролирующего субъекта "

Если стратегия объявлена ​​внутри контролирующего субъекта (в отличие от объекта-компаньона), ее решающее устройство имеет доступ ко всем внутренним состояниям субъекта потокобезопасным способом, , включая получение ссылки на текущий сбойный субъект child (доступно как отправитель сообщения об ошибке) .

Сообщение недоступно, поэтому единственный вариант - перехватить ваше исключение и выдать специальное сообщение с полученным сообщением.

Вот быстрая скрипка

class ActorSO extends Actor {

  def _receive: Receive = {
    case e =>
      println(e)
      throw new RuntimeException(e.toString)
  }

  final def receive = {
    case any => try {
      _receive(any)
    }
    catch {
      case t:Throwable => throw new ActorException(self,t,any)
    }

  }

}

Обновление

A Decider - это просто PartialFunction, поэтому вы можете передать его в конструктор.

object SupervisorActor {
  def props(decider: Decider) = Props(new SupervisorActor(decider))
}

class SupervisorActor(decider: Decider) extends Actor {

  override val supervisorStrategy = OneForOneStrategy()(decider)

  override def receive: Receive = ???
}

class MyDecider extends Decider {
  override def isDefinedAt(x: Throwable): Boolean = true

  override def apply(v1: Throwable): SupervisorStrategy.Directive = {
    case t:ActorException => Restart
    case notmatched => SupervisorStrategy.defaultDecider.apply(notmatched)
  }
}

object Test {
  val myDecider: Decider = {
    case t:ActorException => Restart
    case notmatched => SupervisorStrategy.defaultDecider.apply(notmatched)
  }
  val myDecider2 = new MyDecider()
  val system = ActorSystem("stackoverflow")
  val supervisor = system.actorOf(SupervisorActor.props(myDecider))
  val supervisor2 = system.actorOf(SupervisorActor.props(myDecider2))
}

Таким образом, вы не сможете получить доступ к состоянию супервизора, как ActorRef ребенка, который выдает исключение через sender() (хотя мы включаем это в ActorException)

Относительно вашего исходного вопроса о доступе от супервизора дочернего сообщения, которое вызывает исключение, вы можете увидеть здесь (из Акки 2.5.3), что разработчики Акки решили не делать его доступным для принятия решения.

  final protected def handleFailure(f: Failed): Unit = {
    // ¡¡¡ currentMessage.message is the one that cause the exception !!!
    currentMessage = Envelope(f, f.child, system)
    getChildByRef(f.child) match {
      /*
       * only act upon the failure, if it comes from a currently known child;
       * the UID protects against reception of a Failed from a child which was
       * killed in preRestart and re-created in postRestart
       */
      case Some(stats) if stats.uid == f.uid ⇒
        // ¡¡¡ currentMessage.message is not passed to the handleFailure !!!
        if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
      case Some(stats) ⇒
        publish(Debug(self.path.toString, clazz(actor),
          "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
      case None ⇒
        publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
    }
  }
...