Во-первых, вам не нужно явно передавать отправителя, так как отправитель отслеживается платформой актеров Scala.Вы всегда можете получить доступ к отправителю сообщения, используя метод sender
.
Как можно увидеть здесь: scala.actors.MQueue , почтовый ящик актера реализован как связанный список ипоэтому ограничен только размером кучи.
Тем не менее, если вы обеспокоены тем, что производитель работает очень быстро, а потребитель очень медленно, я предлагаю вам изучить механизм регулирования.Но я бы не рекомендовал подход из принятого ответа на вопрос ограничение размера почтового ящика scala .
Попытка отправки сообщений о перегрузке, когда система сильно загружена, не кажется хорошейидея, в общем.Что если ваша система слишком занята, чтобы проверить ее на перегрузку?Что если получатель сообщения о перегрузке слишком занят, чтобы на него воздействовать?Кроме того, удаление сообщений не очень хорошая идея для меня.Я думаю, что вы хотите, чтобы все ваши рабочие элементы обрабатывались надежно.
Кроме того, я бы не стал полагаться на mailboxSize
при определении нагрузки.Вы не можете различить разные типы сообщений и можете проверять только изнутри самого потребителя, а не от производителя.
Я предлагаю использовать подход, при котором потребитель запрашивает больше работы, когда он знает, что может с этим справиться.
Ниже приведен простой пример того, как это можно реализовать.
import scala.actors._
import Actor._
object ConsumerProducer {
def main(args: Array[String]) {
val producer = new Producer(Iterator.range(0, 10000))
val consumer = new Consumer(producer)
}
}
case class Produce(count: Int)
case object Finished
class Producer[T](source: Iterator[T]) extends Actor {
start
def act() {
loopWhile(source.hasNext) {
react {
case Produce(n: Int) => produce(n)
}
}
}
def produce(n: Int) {
println("producing " + n)
var remaining = n
source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
if(!source.hasNext) sender ! Finished
}
}
class Consumer(producer: Actor) extends Actor {
start
private var remaining = 0
def act() {
requestWork()
consume()
}
def consume(): Nothing = react {
case Finished => println("Finished")
case n: Int => work(n); requestWork(); consume()
}
def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }
def work(n: Int) = {
println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
remaining -= 1
}
}