Лучший способ заглянуть в почтовый ящик актера Scala - PullRequest
6 голосов
/ 27 апреля 2010

Используя Scala 2.8 RC1 или новее, какой самый лучший (самый простой и / или самый прямой) метод «заглянуть» в ожидающие сообщения в почтовом ящике актера (из метода акта () того же самого актора) для проверки что находится в очереди, без необходимости реагировать / получать сообщения и / или каким-либо образом нарушать текущее содержимое почтового ящика.

Цель этого заключается в том, чтобы субъект мог определить, безопасно ли обрабатывать запрос на выход, сначала определив, являются ли какие-либо из оставшихся сообщений почтового ящика теми, которые должны быть обработаны, а не просто отброшен путем немедленной остановки субъекта. .

Ответы [ 2 ]

10 голосов
/ 19 июня 2010

Вам не нужно заглядывать вперед. Просто следите за тем, что был запрошен выход, и используйте responseWithin (0), чтобы определить, когда очередь пуста после запроса на выход.

import scala.actors._

sealed case class Message
case object Exit extends Message
case class Unimportant(n:Int) extends Message
case class Important(n:Int) extends Message

class SafeExitingActor extends Actor {
  def act : Nothing = react {
      case Exit => {
           println("exit requested, clearing the queue")
           exitRequested
      }
      case message => {
           processMessage(message, false)
           act
      }
  }

  // reactWithin(0) gives a TIMEOUT as soon as the mailbox is empty
  def exitRequested : Nothing = reactWithin(0) {
     case Exit => {
         println("extra exit requested, ignoring")
         exitRequested // already know about the exit, keep processing
     }
     case TIMEOUT => {
         println("timeout, queue is empty, shutting down")
         exit // TIMEOUT so nothing more to process, we can shut down
     }
     case message => {
         processMessage(message, true)
         exitRequested
     }
  }

  // process is a separate method to avoid duplicating in act and exitRequested
  def processMessage(message : Any, importantOnly : Boolean) = {
     message match {
       case Unimportant(n) if !importantOnly => println("Unimportant " + n)
       case Unimportant(n) => () // do nothing
       case Important(n) => println("Important! " + n)
     }
     Thread sleep 100 // sleep a little to ensure mailbox backlog
  }
}

object TestProcessing {
  def main(args : Array[String]) {
    val actor = new SafeExitingActor()
    actor.start()
    for (i <- 1 to 10) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    for (i <- 11 to 20) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    actor ! Important(100)
  }
}

Это должно вывести

Unimportant 1
Important! 1
Unimportant 2
Important! 2
Unimportant 3
Important! 3
Unimportant 4
Important! 4
Unimportant 5
Important! 5
Unimportant 6
Important! 6
Unimportant 7
Important! 7
Unimportant 8
Important! 8
Unimportant 9
Important! 9
Unimportant 10
Important! 10
exit requested, clearing the queue
Important! 11
Important! 12
Important! 13
Important! 14
Important! 15
Important! 16
Important! 17
Important! 18
Important! 19
Important! 20
extra exit requested, ignoring
Important! 100
timeout, queue is empty, shutting down
3 голосов
/ 27 апреля 2010

В целом это звучит как опасная операция, поскольку, если есть критические сообщения, субъект, обрабатывающий их, может проверить и не найти ни одного, но затем перед выходом может быть получен другой из какого-либо другого потока.

Если выЗнайте наверняка, что это не может произойти, и вам не нужно много невероятно быстрых переключателей сообщений, я, вероятно, написал бы охранника, который считает и отслеживает критические сообщения, но в противном случае просто передает их другому субъекту дляобработка.

Если вы не хотите этого делать, имейте в виду, что детали внутренних компонентов должны измениться, и вам, возможно, придется просмотреть исходный код Actor, Reactor, MessageQueue и т. д. вчтобы получить то, что вы хотите.На данный момент, что-то вроде этого должно работать (предупреждение, не проверено):

package scala.actors
package myveryownstuff
trait CriticalActor extends Actor {
  def criticalAwaits(p: Any => Boolean) = {
    synchronized {
      drainSendBuffer(mailbox)
      mailbox.get(0)(p).isDefined
    }
  }
}

Обратите внимание, что мы должны поместить расширенную черту в пакет scala.actors, потому что все внутренние компоненты почтового ящика объявлены частными для scalaПакет .actors.(Это хорошее предупреждение, что вы должны быть осторожны, прежде чем связываться с внутренностями.) Затем мы добавляем новый метод, который принимает функцию, которая может проверять критическое сообщение и искать его, используя встроенный метод mailbox.get(n), который возвращаетn th-е сообщение, выполняющее некоторый предикат.

...