Вопросы
- Как вы получаете и регистрируете сообщения в
receiveCommand
постоянного
актер с AtLeastOnceDelivery
внутри пользовательского почтового ящика ставит в очередь и удаляет?
- Как вы можете получить имя постоянного субъекта в вашей пользовательской очереди почтовых ящиков и в очереди?
У меня есть специальный почтовый ящик со следующим MessageQueue:
Проблема в том, что сообщения, полученные постоянным действующим лицом в receiveCommand
, вообще не регистрируются в очереди или в очереди. Только персистирование зарегистрировано.
Кроме того, owner.get.path.name
в моем коде ниже никогда не получает имена постоянных актеров. Это получает имена как recoveryPermitter
и inmemory-snapshot-store
.
class CustomMailbox(val backend: MessageQueue, owner: Option[ActorRef], system: Option[ActorSystem]) extends MessageQueue {
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
// Simulate DROP
println(s"messageType in ${owner.get.path.name}"+handle.message.getClass.toString)
val canEnqueue = Math.random() > 0.95
println(s"enqueing for ${owner.get.path}: $handle")
backend.enqueue(receiver, handle)
}
override def dequeue(): Envelope = {
val mb:UnboundedMailbox.MessageQueue = backend.asInstanceOf[UnboundedMailbox.MessageQueue]
val peek = mb.queue.peek()
println(s"peeking for ${owner.get.path.name}: $peek")
if(peek != null) {
println(s"messageType in degueue ${owner.get.path.name}"+peek.message.getClass.toString)
val canDequeue = Math.random() > 0.9
println(s"dequeing for ${owner.get.path}: $peek")
backend.dequeue()
}
else
{
null
}
}
override def numberOfMessages: Int = backend.numberOfMessages
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = backend.cleanUp(owner, deadLetters)
override def hasMessages: Boolean = backend.hasMessages
}
Например, у меня есть постоянный актер с именем childclient
и перечисленными ниже методами receiveCommand
и updateState
: мои очереди и очереди в моем настраиваемом почтовом ящике выше будут регистрировать что-то вроде:
Enqueueing
enqueing for akka://WebShop/system/inmemory-journal: Envelope(WriteMessages(Vector(AtomicWrite(List(PersistentImpl(Goods(goods,5),1,my-Client-id,,false,Actor[akka://WebShop/user/webShopActor#-659139690],be7c37ad-9c0e-4d77-84ec-d9fd94b5f623)))),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822],2),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822])
освобождение пакета из очереди / выглядывал
peeking for inmemory-journal:
Envelope(WriteMessages(Vector(AtomicWrite(List(PersistentImpl(Goods(goods,5),1,my-Client-id,,false,Actor[akka://WebShop/user/webShopActor#-659139690],be7c37ad-9c0e-4d77-84ec-d9fd94b5f623)))),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822],2),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822])
Это показывает, что почтовый ящик обрабатывает постоянное событие Goods
после получения командного сообщения BuyGoods
в моем ChildClient
. Он регистрирует только сохранение Goods
, но не регистрирует постановку в очередь или снятие с очереди (просмотр) BuyGoods
.
def updateState(evt: Evt): Unit = evt match {
case ReduceBalance(deliveryId, amount) if state.CashBal - amount < 0 ⇒
confirmDelivery(deliveryId)
throw InsufficientBalance(s"Client cannot withdraw $amount from ${state.CashBal}")
case e: ReduceBalance ⇒ state = ClientState(state.CashBal - e.amount)
confirmDelivery(e.deliveryId)
deliver(paymentsActor)(deliveryId => PaymentMessage(deliveryId, e.amount))
case e: Goods =>
deliver(stockActor)(deliveryId => ReduceStock(deliveryId, e.name, e.amount))
case e: AddCredit =>
state = ClientState(state.CashBal + e.amount)
case e: PaymentAccepted =>
log.info("client confirmed payment:" + e)
confirmDelivery(e.deliveryId)
}
override def receiveCommand: Receive = {
//request for goods from stockActor
case c: BuyGoods =>
log.info("Client Recieved: " + c)
persistAsync(Goods(c.name, c.Amount))(updateState)
//payments actor confirms payment
}