Как настроить отказоустойчивость Akka Actor? - PullRequest
8 голосов
/ 16 августа 2011

Я пытаюсь получить отказоустойчивое поведение в akka Actors.Я работаю над кодом, который зависит от актеров в системе, доступных для длительного выполнения обработки.Я обнаружил, что моя обработка останавливается через пару часов (это должно занять около 10 часов), и ничего особенного не происходит.Я полагаю, что мои Актеры не восстанавливаются после исключений.

Что мне нужно сделать, чтобы заставить Актеров быть перезапущенными один на один навсегда?Я ожидаю, что это можно сделать из этой документации http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

Я работаю с akka 1.1.3 и scala 2.9

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
           throw new Exception("This is a simulated failure")
         println("Actor: " + name + " Received: " + num)
         //Thread.sleep(100)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  //callback method for restart handling 
  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  //callback method for restart handling 
  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = { System.out.println("postStop") }
}


  object FaultTest {
    def main(args : Array[String]) : Unit = {
      println("starting FaultTest.main()")
      val numOfActors = 5
      val supervisor = actorOf(
        new TestActorManager with CyclicLoadBalancing {
             val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
        }
      )

      supervisor.start();

      println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

      val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

      (1 until 200 toList) foreach { testActor ! _ }

    }
  }

Этот код устанавливает 5 актеров за LoadBalancer, которыйпросто распечатайте целые числа, которые им отправляются, за исключением того, что они генерируют исключения на четных числах для имитации неисправностей.Целые числа от 0 до 200 отправляются этим субъектам.Я ожидаю, что нечетные числа получат вывод, но все кажется отключенным после пары ошибок на четных числах.Запуск этого кода с помощью sbt приводит к следующему выводу:

[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

Я думаю, что здесь происходит то, что 5 актеров начинаются, и первые 5 четных чисел выводят их из бизнеса, и они не перезапускаются.

Как изменить этот код, чтобы субъекты восстанавливались после исключений?

Я ожидаю, что на самом деле это напечатало бы все нечетные числа от 1 до 200. Я думаю, что каждый участник потерпит неудачу на четных числах, но будет перезапущен с подключенным почтовым ящиком при исключениях.Я ожидаю увидеть println от preRestart и postRestart.Что нужно настроить в этом примере кода, чтобы это произошло?

Вот некоторые дополнительные предположения об akka и актерах, которые могут привести к моему недоразумению.Я предполагаю, что Actor можно настроить с помощью Supervisor или faultHandler, чтобы он был перезапущен и оставался доступным при возникновении исключения во время получения.Я предполагаю, что сообщение, которое было отправлено актеру, будет потеряно, если оно вызовет исключение во время получения.Я предполагаю, что preRestart () и postRestart () для актера, который вызывает исключение, будут вызваны.

Пример кода представляет то, что я пытаюсь сделать, и основан на Почемумоя рассылка по актерам уменьшена в акке?

** Еще один пример кода **

Вот еще один пример кода, который является более простым.Я начинаю одного актера, который бросает исключения на четные числа.На пути нет балансировщика нагрузки или других вещей.Я пытаюсь распечатать информацию об актере.Я жду, чтобы выйти из программы в течение минуты после того, как сообщения были отправлены Актеру, и отслеживаю происходящее.

Я ожидаю, что это напечатало бы нечетные числа, но похоже, что Актер сидит без делас сообщениями в своем почтовом ящике.

Неправильно ли настроен OneForOneStrategy?Нужно ли связывать Актера с чем-то?Является ли такая конфигурация в корне неверной с моей стороны?Нужно ли как-то настраивать диспетчер с отказоустойчивостью?Могу я испортить потоки в Диспетчере?

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
            throw new Exception("This is a simulated failure, where does this get logged?")
         println("Actor: " + name + " Received: " + num)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

object TestSingleActor{

    def main(args : Array[String]) : Unit = {
      println("starting TestSingleActor.main()")

      val testActor = Actor.actorOf( new SingleActor(1) ).start()

      println("number of actors: " + registry.actors.size)
      printAllActorsInfo

      (1 until 20 toList) foreach { testActor ! _ }

      for( i <- 1 until 120 ){
        Thread.sleep(500)
        printAllActorsInfo
      }
    }

  def printAllActorsInfo() ={
    registry.actors.foreach( (a) =>
       println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
               .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
  }
}

Я получаю вывод вроде:

[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM

Ответы [ 2 ]

5 голосов
/ 18 августа 2011

Проблема была в том, что я был с моим файлом akka.conf.Я использовал ссылочный файл akka.conf версии 1.1.3, за исключением строки, в которой настраивались обработчики событий.

мой (сломанный):

    event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

ссылка 1.1.3 (тот, который работает):

    event-handlers = ["akka.event.EventHandler$DefaultListener"]

С моей строкой конфигурации обработчиков событий перезапуски Actor не происходят.Со ссылкой 1.1.3 перезапуски строки происходят чудесно.

Я сделал это изменение, основываясь на этих инструкциях http://akka.io/docs/akka/1.1.3/general/slf4j.html

Итак, избавившись от предложений на этой странице и перейдявернуться к справке 1.1.3 akka.conf Мне удалось получить отказоустойчивых актеров.

1 голос
/ 17 августа 2011

Я полагаю, что ваша проблема прекращается после отправки сообщений, вы не пытаетесь сохранить работоспособность вашего асинхронного приложения, и поэтому основной поток завершает работу и с ним все сводит.

...