актеры scala: отбрасывать сообщения, если очередь слишком длинная? - PullRequest
3 голосов
/ 20 мая 2011

Я хотел бы удалить сообщения из почтового ящика актера, если он переполнится. Например, если размер очереди достигает 1000 сообщений, самое старое должно быть удалено.

Ответы [ 2 ]

6 голосов
/ 20 мая 2011

Вы не можете работать с почтовым ящиком напрямую, но вы можете внедрить шаблон Срок действия сообщения поверх существующей библиотеки.

Отправлять дату создания с каждым сообщением:

case class ExpirableMessage(msg: String, createdAt: Long) 

Сканирование почтового ящика с помощью reactWithin(0) и отфильтрование просроченных сообщений:

react{ 
  case msg: ExpirableMessage => 
    // handle the message
    // clean the mailbox with nested react
    reactWithin(0){
        case ExpirableMessage(_, createdAt) if(currentTimeMillis - createdAt > INTERVAL) =>
        case TIMEOUT =>
    }
}
3 голосов
/ 22 мая 2011

Вы также можете ограничить очередь актера в куче и ограничить его использование с помощью прокси-актера.Затем вы можете написать что-то вроде следующего:

// adder actor with a bounded queue size of 4
val adder = boundActor(4) {
  loop {
    react {
      case x: Int => reply(x*2)
    }
  }
}

// test the adder
actor {
  for (i <- 1 to 10) {
    adder !! (i, { case answer: Int => println("Computed " + i + " -> " + answer) })
  }
}

Вот реализация boundedActor.Обратите внимание, что boundedActor всегда должен отвечать своему отправителю, в противном случае невозможно отследить размер его очереди, и boundedActor остановится, отказываясь принимать любые дальнейшие сообщения.

object ActorProxy extends scala.App {

  import scala.actors._
  import scala.actors.Actor._
  import scala.collection.mutable._

  /**
   * Accepts an actor and a message queue size, and 
   * returns a proxy that drops messages if the queue
   * size of the target actor exceeds the given queue size.
   */
  def boundActorQueue(target: Actor, maxQueueLength: Int) = actor {
    val queue = new Queue[Tuple2[Any, OutputChannel[Any]]]
    var lastMessageSender: Option[OutputChannel[Any]] = None

    def replyHandler(response: Any) {
      if (lastMessageSender.get != null) lastMessageSender.get ! response
      if (queue.isEmpty) {
        lastMessageSender = None
      } else {
        val (message, messageSender) = queue.dequeue
        forwardMessage(message, messageSender)
      }
    }

    def forwardMessage(message: Any, messageSender: OutputChannel[Any]) = {
      lastMessageSender = Some(messageSender)
      target !! (message, { case response => replyHandler(response) })
    }

    loop {
      react {
        case message =>
          if (lastMessageSender == None) {
            forwardMessage(message, sender)
          } else {
            queue.enqueue((message, sender))
            // Restrict the queue size
            if (queue.length > maxQueueLength) {
                val dropped = queue.dequeue
                println("!!!!!!!! Dropped message " + dropped._1)
            }
          }
      }
    }
  }

  // Helper method
  def boundActor(maxQueueLength: Int)(body: => Unit): Actor = boundActorQueue(actor(body), maxQueueLength)

}
...