Как возобновить действия актера akka, который потерпел неудачу при обработке отправляемого им сообщения? - PullRequest
1 голос
/ 11 июля 2019

У меня есть следующий пример кода актера.

object SomeExternalDep {
  private var flag = true

  // this function will throw an exception once when called with the value 3, then it won't throw another exception
  @throws[RuntimeException]
  def potentiallyThrows(curr: Int): Unit = {
    if (curr == 3 && flag) {
      flag = false
      throw new RuntimeException("Something went wrong in external dependency")
    }
  }
}

class CountingActor(start: Int, end: Int)
  extends Actor
    with ActorLogging {

  var curr: Int = start

  // This method counts for us
  private def doCount(): Unit = {
    // This may throw, which will cause this actor to fail
    SomeExternalDep.potentiallyThrows(curr)

    // Send self a count message. If the above call exceptions this is never called
    if (curr <= end) {
      self ! CountingActor.Count(curr)
    }
  }

  override def receive: Receive = {
    case CountingActor.Start => doCount()
    case CountingActor.Count(n) =>
      log.info(s"Counting: $n")
      curr += 1
      doCount()

    case x => log.error(s"bad message $x")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error(s"CountingActor failed while processing $message")
    self ! CountingActor.Start
  }
}

object CountingActor {
  def props(start: Int, end: Int): Props = Props(new CountingActor(start, end))

  case object Start
  case class Count(n: Int)
}

class SupervisorActor
  extends Actor
    with ActorLogging {

  var countingActor: ActorRef = _

  override val supervisorStrategy: OneForOneStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
      // case _: RuntimeException => Restart
      case _: RuntimeException => Resume
    }

  private def doStart(): Unit = {
    countingActor = context.actorOf(CountingActor.props(0, 5))

    countingActor ! CountingActor.Start
  }

  override def receive: Receive = {
    case SupervisorActor.Init => doStart()
    case _ => log.error(s"Supervisor doesn't process messages")
  }

}

Здесь CountingActor в основном отправляет себе сообщение Count.Затем он вызывает некоторую внешнюю зависимость, которая может дать сбой.Он также вносит некоторые изменения в свое внутреннее состояние при подсчете.Я также реализовал простой SupervisorActor.Этот актер создает CountingActor в качестве своего потомка.

Когда стратегия наблюдения установлена ​​на Restart.Я получаю ожидаемый результат.Актер считает до 3, терпит неудачу, потому что он видит исключение.Хук preRestart отправляет новое сообщение Start в почтовый ящик и снова начинает считать.

[INFO] [07/10/2019 15:23:59.895] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 2
[ERROR] [07/10/2019 15:23:59.905] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
java.lang.RuntimeException: Something went wrong in external dependency
    at SomeExternalDep$.potentiallyThrows(ActorSupervisionTest.scala:15)
    at CountingActor.CountingActor$$doCount(ActorSupervisionTest.scala:30)

<Stack Trace omitted>

[ERROR] [07/10/2019 15:23:59.909] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] CountingActor failed while processing Some(Count(2))
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 2
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 3
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 4
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 5

Но когда я изменяю стратегию наблюдения на Resume.Актер застревает, потому что он потерпел неудачу, прежде чем он смог отправить себе следующее Count сообщение.

[INFO] [07/10/2019 15:26:01.779] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 2
[WARN] [07/10/2019 15:26:01.786] [counting-sys-akka.actor.default-dispatcher-4] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency

Как мне обойти это, чтобы я мог возобновить подсчет с 3, если внешняя зависимость не удалась?

1 Ответ

2 голосов
/ 11 июля 2019

Похоже, что код, который фактически запускается. Ваша логика - это в основном цикл от 1 до N, где на каждой итерации вы отправляете сообщение для перехода к следующей итерации, проблема в том, что если выдается исключение, вы неЧтобы отправить сообщение для перехода к следующей итерации, здесь супервизор выполняет свою работу, перезапуск прост, потому что выполняется код для повторного запуска цикла, но если вы возобновите поток, появится сообщение для перехода к следующей итерации.никогда не отправляется.

Простой обходной путь - изменить порядок операций для метода doCount, сначала отправив сообщение самому себе, а затем, обрабатывая опасную операцию, это должно работать для стратегии Resumeно я бы протестировал некоторые сценарии, прежде чем на самом деле использовать этот подход, один из которых мне неизвестен, будет ли akka отбрасывать почтовый ящик в случае стратегии Restart, я считаю, что это не так, что означает, что после перезапуска субъекта он будетполучить ожидающее сообщение.

Другим обходным решением может быть повторная отправкасообщение от супервизора после возобновления дочернего актера.

Редактировать : Я немного заглянул в источник akka, и нет очевидного способа отловить событие возобновления, на самом деле естьвнутреннее событие Resume, но оно является приватным для akka и не отправляется вашему действующему действующему лицу, я думаю, что если вам нравится использовать стратегию Resume, не связывайтесь с супервизором и просто поймайте возможные исключения внутри вашегоактер (который в основном эмулирует стратегию возобновления), это должно дать вам ожидаемое поведение вместо того, чтобы иметь дело с возможными угловыми случаями.

...