У меня есть ProcessOrdersActor
, который выглядит примерно так, как показано ниже. Он получает команду ProcessABatchOfOrders
, которую затем делегирует своим дочерним элементам (OrderHandler
), которая выполняется с RoundRobinPool
. Дети отвечают либо OrderFailedToProcess
, либо FinishedWithOrder
. Если ProcessOrdersActor
получает один OrderFailedToProcess
, то вся партия должна считаться зараженной, и все будет прервано.
Это работает нормально, кроме случаев, когда я остановлю OrderHandler
, он все равно продолжит обрабатывать текущую сообщение (что, конечно, по замыслу). Из-за этого он отправит FinishedWithOrder
своему родителю. Проблема заключается в том, что родительский элемент уже начал работать с другой партией заказов и поэтому будет смешивать заказы.
Я хочу обработать sh оставшиеся FinishedWithOrder
, прежде чем я начну обрабатывать новый ProcessABatchOfOrders
. Возможно ли это?
У меня была идея, что это можно решить с помощью пользовательского Mailbox
, который устанавливает приоритет FinishedWithOrder
, а затем проверяет, что они получены как мертвые буквы в Ready
-состоянии. Это, однако, создает второй вопрос, как использовать пользовательский Mailbox
в модульном тесте на основе TestKit
?
public class ProcessOrdersActor: ReceiveActor, IWithUnboundedStash
{
private IActorRef orderHandlers;
public ProcessOrdersActor()
{
this.orderHandlers = Context
.ActorOf(Props.Create(() => new OrderHandler())
.WithRouter(new RoundRobinPool(10))), "OrderHandler");
Ready();
}
private void BecomeReady()
{
UnbecomeStacked();
Stash.UnstashAll();
}
private void Ready()
{
Receive<ProcessABatchOfOrders>(cmd=>
{
foreach (var order in cmd.Orders)
{
// Delegate processing to children in router
orderHandler.Tell(new ProcessOrder(order),
}
BecomeStacked(() => WaitingForProcesses());
});
}
private void WaitingForProcesses()
{
Receive<OrderFailedToProcess>(msg =>
{
// One order failed. Then this entire batch should abort.
// It can also be done with Supervision, but here I stop the children with Stop().
this.orderHandlers.Stop();
// Here I want to make sure that no message from orderHandlers will return here.
BecomeReady();
});
Receive<FinishedWithOrder>(msg =>
{
// Here I will receive a from previous FinishedWithOrder batch, which I do not want.
// Do stuff. And when all orders are processed I will swap state again...
if(everythingIsDone)
BecomeReady();
});
ReceiveAny(o => Stash.Stash());
}
}