У меня возникла проблема при попытке использования маршрутизаторов akka.net.Мой маршрутизатор циклического перебора не завершает работу после широковещательной рассылки сообщения PoisonPill, если в маршрутизаторе есть Исключение, которое обрабатывается с использованием supervisorstrategy.Если исключение не выбрасывается или обрабатывается с помощью try catch в маршрутизаторе, субъект-маршрутизатор завершает работу нормально.Что-то мне не хватает в моем подходе?
пример кода для воспроизведения проблемы:
using System;
using System.IO;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
using NLog;
using NLog.Config;
using NLog.Targets;
using LogLevel = NLog.LogLevel;
namespace AkkaTest
{
class Program
{
static void Main(string[] args)
{
InitNlog();
var actorSystem = ActorSystem.Create("ActorSystem");
IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));
for (int i = 0; i < 1000; i++)
{
ChildActor.ProcessData processData = new ChildActor.ProcessData(i);
coordinator.Tell(processData);
}
coordinator.Tell(new Coordinator.DisposeAll());
Console.ReadLine();
}
static void InitNlog()
{
// Step 1. Create configuration object
var config = new LoggingConfiguration();
// Step 2. Create targets and add them to the configuration
var consoleTarget = new ColoredConsoleTarget();
config.AddTarget("console", consoleTarget);
// Step 3. Set target properties
consoleTarget.Layout = @"${date:format=HH\:mm\:ss} ${logger} ${message}";
// Step 4. Define rules
var rule1 = new LoggingRule("*", LogLevel.Debug, consoleTarget);
config.LoggingRules.Add(rule1);
// Step 5. Activate the configuration
LogManager.Configuration = config;
}
}
public class Coordinator : ReceiveActor
{
public class DisposeAll
{
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _consumer;
public Coordinator()
{
Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });
Receive<DisposeAll>(x => { _consumer.Tell(x); });
}
protected override void PreStart()
{
if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
{
_consumer = Context.ActorOf(
Props.Create(() => new Consumer())
, "Consumer");
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex =>
{
if (ex is InvalidDataException)
{
return Directive.Resume;
}
return Directive.Stop;
});
}
}
public class Consumer : ReceiveActor
{
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _childRouter;
private int _progress;
public Consumer()
{
Receive<ChildActor.ProcessData>(x =>
{
_progress++;
if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);
_childRouter.Forward(x);
});
Receive<Terminated>(x =>
{
_logger.Error("Child Router terminated.");
});
Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
}
protected override void PreStart()
{
if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
{
_childRouter =
Context.ActorOf(
Props.Create(() => new ChildActor())
.WithRouter(new RoundRobinPool(100))
.WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");
Context.Watch(_childRouter);
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex => Directive.Escalate);
}
}
public class ChildActor : ReceiveActor
{
public class ProcessData
{
public int Data { get; private set; }
public ProcessData(int data)
{
Data = data;
}
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
public ChildActor()
{
Receive<ProcessData>(x =>
{
if (x.Data % 5 == 0)
{
_logger.Info("{0} is Divisible by 5", x.Data);
}
else
{
//if this line is commented, router terminates just fine
throw new InvalidDataException("Error while processing.");
}
});
}
}
}