Работа с пользовательскими почтовыми ящиками и постоянными участниками с AtLeastOnceDelivery - PullRequest
1 голос
/ 15 марта 2019

Вопросы

  1. Как вы получаете и регистрируете сообщения в receiveCommand постоянного актер с AtLeastOnceDelivery внутри пользовательского почтового ящика ставит в очередь и удаляет?
    1. Как вы можете получить имя постоянного субъекта в вашей пользовательской очереди почтовых ящиков и в очереди?

У меня есть специальный почтовый ящик со следующим MessageQueue:

Проблема в том, что сообщения, полученные постоянным действующим лицом в receiveCommand, вообще не регистрируются в очереди или в очереди. Только персистирование зарегистрировано. Кроме того, owner.get.path.name в моем коде ниже никогда не получает имена постоянных актеров. Это получает имена как recoveryPermitter и inmemory-snapshot-store.

class CustomMailbox(val backend: MessageQueue, owner: Option[ActorRef], system: Option[ActorSystem]) extends MessageQueue {

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
        // Simulate DROP
        println(s"messageType in  ${owner.get.path.name}"+handle.message.getClass.toString)
        val canEnqueue = Math.random() > 0.95
        println(s"enqueing for ${owner.get.path}: $handle")
        backend.enqueue(receiver, handle)
    }

    override def dequeue(): Envelope = {
        val mb:UnboundedMailbox.MessageQueue = backend.asInstanceOf[UnboundedMailbox.MessageQueue]
        val peek = mb.queue.peek()
        println(s"peeking for ${owner.get.path.name}: $peek")
        if(peek != null) {
            println(s"messageType in degueue ${owner.get.path.name}"+peek.message.getClass.toString)
            val canDequeue = Math.random() > 0.9
                println(s"dequeing for ${owner.get.path}: $peek")
                backend.dequeue()
        }

        else
        {
            null
        }
    }

    override def numberOfMessages: Int = backend.numberOfMessages

    override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = backend.cleanUp(owner, deadLetters)

    override def hasMessages: Boolean = backend.hasMessages
}

Например, у меня есть постоянный актер с именем childclient и перечисленными ниже методами receiveCommand и updateState: мои очереди и очереди в моем настраиваемом почтовом ящике выше будут регистрировать что-то вроде:

Enqueueing

 enqueing for akka://WebShop/system/inmemory-journal: Envelope(WriteMessages(Vector(AtomicWrite(List(PersistentImpl(Goods(goods,5),1,my-Client-id,,false,Actor[akka://WebShop/user/webShopActor#-659139690],be7c37ad-9c0e-4d77-84ec-d9fd94b5f623)))),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822],2),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822])

освобождение пакета из очереди / выглядывал

peeking for inmemory-journal: 
Envelope(WriteMessages(Vector(AtomicWrite(List(PersistentImpl(Goods(goods,5),1,my-Client-id,,false,Actor[akka://WebShop/user/webShopActor#-659139690],be7c37ad-9c0e-4d77-84ec-d9fd94b5f623)))),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822],2),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822])

Это показывает, что почтовый ящик обрабатывает постоянное событие Goods после получения командного сообщения BuyGoods в моем ChildClient. Он регистрирует только сохранение Goods, но не регистрирует постановку в очередь или снятие с очереди (просмотр) BuyGoods.

 def updateState(evt: Evt): Unit = evt match {
    case ReduceBalance(deliveryId, amount) if state.CashBal - amount < 0 ⇒
      confirmDelivery(deliveryId)
      throw InsufficientBalance(s"Client cannot withdraw $amount from ${state.CashBal}")
    case e: ReduceBalance ⇒ state = ClientState(state.CashBal - e.amount)
      confirmDelivery(e.deliveryId)
      deliver(paymentsActor)(deliveryId => PaymentMessage(deliveryId, e.amount))
    case e: Goods =>
      deliver(stockActor)(deliveryId => ReduceStock(deliveryId, e.name, e.amount))
    case e: AddCredit =>
      state = ClientState(state.CashBal + e.amount)
    case e: PaymentAccepted =>
      log.info("client confirmed payment:" + e)
      confirmDelivery(e.deliveryId)
  }


  override def receiveCommand: Receive = {
    //request for goods from stockActor

    case c: BuyGoods =>
      log.info("Client Recieved: " + c)
      persistAsync(Goods(c.name, c.Amount))(updateState)
    //payments actor confirms payment

  }
...