Akka Actor не завершается, если выбрасывается исключение - PullRequest
73 голосов
/ 30 мая 2011

Я сейчас пытаюсь начать работу с Akka и столкнулся со странной проблемой. У меня есть следующий код для моего актера:

class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}

И вот как я начинаю свои рабочие:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

И вот как я все выключил:

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

Теперь, что происходит, если я отправляю рабочим сообщения с n> 0 (исключение не выдается), все работает нормально, и приложение корректно завершает работу. Тем не менее, как только я отправляю ему одно сообщение, которое приводит к исключению, приложение не завершает работу, потому что все еще выполняется действующий субъект, но я не могу понять, откуда оно.

Если это поможет, это стек рассматриваемой нити:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   

PS: Поток, который не завершается, не является ни одним из рабочих потоков, потому что я добавил обратный вызов postStop, каждый из которых останавливается правильно.

PPS: Actors.registry.shutdownAll обходит проблему, но я думаю, что shutdownAll следует использовать только в качестве крайней меры, не так ли?

Ответы [ 3 ]

21 голосов
/ 11 ноября 2011

Надлежащим способом решения проблем внутри акторов akka является не исключение, а установка иерархий супервизора

"Создание исключения в параллельном коде (предположим, что мы используем несвязанные актеры), просто просто взорвать нить, которая в настоящее время исполняет актера.

Нет никакого способа узнать, что все пошло не так (кроме проверка трассировки стека). С этим ничего не поделаешь. "

см. Отказоустойчивость через иерархии супервизоров (1.2)

* примечание * выше справедливо для старых версий Akka (1.2) В более новых версиях (например, 2.2) вы по-прежнему устанавливаете иерархию супервизора, но она будет перехватывать исключения, создаваемые дочерними процессами. например,

class Child extends Actor {
    var state = 0
    def receive = {
      case ex: Exception ⇒ throw ex
      case x: Int        ⇒ state = x
      case "get"         ⇒ sender ! state
    }
  }

и в супервизоре:

class Supervisor extends Actor {
    import akka.actor.OneForOneStrategy
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException      ⇒ Resume
        case _: NullPointerException     ⇒ Restart
        case _: IllegalArgumentException ⇒ Stop
        case _: Exception                ⇒ Escalate
      }

    def receive = {
      case p: Props ⇒ sender ! context.actorOf(p)
    }
  }

см. Отказоустойчивость через иерархии супервизора (2.2)

2 голосов
/ 05 января 2012

Отключение регистрации, чтобы убедиться, что все заканчивается, как предложил Виктор, немного странно.Вместо этого вы можете:

EventHandler.shutdown()

, который корректно завершает работу всех прослушивателей (регистраторов), поддерживающих работу мира после исключения:

def shutdown() {
  foreachListener(_.stop())
  EventHandlerDispatcher.shutdown()
}
0 голосов
/ 09 ноября 2011

Поворот регистратора на akka.conf

...