Я сейчас пытаюсь начать работу с Akka и столкнулся со странной проблемой. У меня есть следующий код для моего актера:
class AkkaWorkerFT extends Actor {
def receive = {
case Work(n, c) if n < 0 => throw new Exception("Negative number")
case Work(n, c) => self reply n.isProbablePrime(c);
}
}
И вот как я начинаю свои рабочие:
val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()
И вот как я все выключил:
futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill
Теперь, что происходит, если я отправляю рабочим сообщения с n> 0 (исключение не выдается), все работает нормально, и приложение корректно завершает работу. Тем не менее, как только я отправляю ему одно сообщение, которое приводит к исключению, приложение не завершает работу, потому что все еще выполняется действующий субъект, но я не могу понять, откуда оно.
Если это поможет, это стек рассматриваемой нити:
Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: 158
AbstractQueuedSynchronizer$ConditionObject.await() line: 1987
LinkedBlockingQueue<E>.take() line: 399
ThreadPoolExecutor.getTask() line: 947
ThreadPoolExecutor$Worker.run() line: 907
MonitorableThread(Thread).run() line: 680
MonitorableThread.run() line: 182
PS: Поток, который не завершается, не является ни одним из рабочих потоков, потому что я добавил обратный вызов postStop, каждый из которых останавливается правильно.
PPS: Actors.registry.shutdownAll
обходит проблему, но я думаю, что shutdownAll следует использовать только в качестве крайней меры, не так ли?