Как перезапустить актера Акка самостоятельно? - PullRequest
1 голос
/ 18 марта 2020

Когда дочерний актер получает пользовательское сообщение RESTART, он должен перезапустить себя.
(Цель - сбросить переменные-члены актора, перезагрузить внешнее состояние из базы данных, но не очистить внутреннюю очередь сообщений актера)

Для реализации перезапуска одним из обходных путей является то, что дочерний актор создает пользовательское исключение, а родительский актор настраивает его OneForOneStrategy для перезапуска дочернего актора для этого указанного c типа исключения.

I мне интересно, есть ли более простой подход к перезагрузке?

1 Ответ

1 голос
/ 18 марта 2020

Цель состоит в том, чтобы сбросить переменные-члены актера, перезагрузить внешнее состояние из db

Я думаю, это, вероятно, самая большая проблема, потому что загрузка внешнего состояния может занять некоторое время, а также блокировать операцию, следовательно, результат операции равен или должен быть Future[] - поэтому во время этой будущей загрузки ваш актер должен игнорировать все другие сообщения, пока не будет получено состояние из БД.

Я думаю В этом случае вам может помочь метод ActorCell#become, так что вы можете изменить метод получения на другой, который будет игнорировать остальные сообщения, кроме сообщений с состоянием БД или данными, а затем переключиться обратно на обычный прием.

Пожалуйста см. пример кода ниже:

  import akka.actor.Actor
  import akka.pattern._
  import scala.concurrent.Future
  import scala.collection.mutable


  // Database API and external state model example
  case class DbExternalState()
  trait Database {
    def loadExternalState: Future[DbExternalState]
  }

  import RestartActor._
  class RestartActor(database: Database) extends Actor {
    private var state = ActorState()
    private val suspendedMessages = mutable.Queue[Any]()

    override def receive: Receive = defaultReceive

    private def defaultReceive: Receive = {
      case Restart => restartActorStart()
    }

    /**
     * Wait until message with internal state received and ignore all the other messages (put back un queue)
     */
    private def suspendedReceive: Receive = {
      case ExternalStateLoaded(state) => restartActorFinish(state)
      case message => suspendedMessages.enqueue(message)
    }

    private def restartActorStart(): Unit = {
      import context.dispatcher
      context.become(suspendedReceive)
      database.loadExternalState.map(ExternalStateLoaded) pipeTo self
    }

    private def restartActorFinish(dbExternalState: DbExternalState): Unit = {
      state = ActorState.initial(dbExternalState)
      context.become(defaultReceive) // Return to normal message handling flow
      suspendedMessages.foreach(message => self ! message)
      suspendedMessages.clear()
    }
  }

  object RestartActor {
    // Restart
    case object Restart
    case class ExternalStateLoaded(state: DbExternalState)

    case class ActorState(internalState: List[String] = Nil, externalState: DbExternalState = DbExternalState())

    object ActorState {
      def initial(externalState: DbExternalState): ActorState = ActorState(externalState = externalState)
    }
  }

Пожалуйста, дайте мне знать, предложения были правильными. Надеюсь, это поможет!

...