Игнорировать сообщения, полученные после Stop () в Akka.net - PullRequest
0 голосов
/ 28 февраля 2020

У меня есть ProcessOrdersActor, который выглядит примерно так, как показано ниже. Он получает команду ProcessABatchOfOrders, которую затем делегирует своим дочерним элементам (OrderHandler), которая выполняется с RoundRobinPool. Дети отвечают либо OrderFailedToProcess, либо FinishedWithOrder. Если ProcessOrdersActor получает один OrderFailedToProcess, то вся партия должна считаться зараженной, и все будет прервано.

Это работает нормально, кроме случаев, когда я остановлю OrderHandler, он все равно продолжит обрабатывать текущую сообщение (что, конечно, по замыслу). Из-за этого он отправит FinishedWithOrder своему родителю. Проблема заключается в том, что родительский элемент уже начал работать с другой партией заказов и поэтому будет смешивать заказы.

Я хочу обработать sh оставшиеся FinishedWithOrder, прежде чем я начну обрабатывать новый ProcessABatchOfOrders. Возможно ли это?

У меня была идея, что это можно решить с помощью пользовательского Mailbox, который устанавливает приоритет FinishedWithOrder, а затем проверяет, что они получены как мертвые буквы в Ready -состоянии. Это, однако, создает второй вопрос, как использовать пользовательский Mailbox в модульном тесте на основе TestKit?

    public class ProcessOrdersActor: ReceiveActor, IWithUnboundedStash
    {
        private IActorRef orderHandlers;

        public ProcessOrdersActor()
        {
             this.orderHandlers = Context
                .ActorOf(Props.Create(() => new OrderHandler())
                .WithRouter(new RoundRobinPool(10))), "OrderHandler");
            Ready();
        }

        private void BecomeReady()
        {
            UnbecomeStacked();
            Stash.UnstashAll();
        }

        private void Ready()
        {
            Receive<ProcessABatchOfOrders>(cmd=>
            {
                foreach (var order in cmd.Orders)
                {
                    // Delegate processing to children in router
                    orderHandler.Tell(new ProcessOrder(order), 
                }
                BecomeStacked(() => WaitingForProcesses());
            });
        }

        private void WaitingForProcesses()
        {

            Receive<OrderFailedToProcess>(msg =>
            {
                // One order failed. Then this entire batch should abort.

                // It can also be done with Supervision, but here I stop the children with Stop().
                this.orderHandlers.Stop();
                // Here I want to make sure that no message from orderHandlers will return here.
                BecomeReady();
            });

            Receive<FinishedWithOrder>(msg =>
            {
                // Here I will receive a from previous FinishedWithOrder batch, which I do not want.

                // Do stuff. And when all orders are processed I will swap state again...
                if(everythingIsDone)
                    BecomeReady();            
            });

            ReceiveAny(o => Stash.Stash());
        }
}
...