Переполнение почтового ящика актеров.Scala - PullRequest
4 голосов
/ 01 июля 2010

Я сейчас работаю с двумя актерами в Scala. Один, продюсер , производит некоторые данные и отправляет их в парсер . Производитель отправляет сообщение HashMap[String,HashMap[Object,List[Int]]] (вместе с this , чтобы отметить отправителя):

parcer ! (this,data)

Парсер постоянно ждет таких сообщений:

def act(){
    loop{
      react{
        case (producer, data)=> parse(data);
      }
    }
}

Программа отлично работает в нормальных условиях. Проблема связана с большими объемами данных и многими отправленными сообщениями (хэш имеет около 10 ^ 4 элементов, внутренний хэш около 100 элементов и список имеет длину 100), программа вылетает. Это не показывает ни ошибок, ни исключений. Это просто останавливает.

Кажется, проблема в том, что мой продюсер работает намного быстрее, чем парсер (и на данный момент я не хочу больше одного парсера).

После прочтения ограничение размера почтового ящика scala Интересно, достигает ли почтовый ящик моего парсера своего предела? Пост также предлагает некоторые решения, но сначала я должен убедиться, что это проблема. Как я могу это проверить?

Есть ли способ узнать ограничение памяти актера? Как насчет чтения использованной / свободной памяти в почтовом ящике?

Любые предложения для рабочего процесса, которые не были опубликованы в по этой ссылке , также приветствуются.

Спасибо

1 Ответ

4 голосов
/ 03 июля 2010

Во-первых, вам не нужно явно передавать отправителя, так как отправитель отслеживается платформой актеров Scala.Вы всегда можете получить доступ к отправителю сообщения, используя метод sender.

Как можно увидеть здесь: scala.actors.MQueue , почтовый ящик актера реализован как связанный список ипоэтому ограничен только размером кучи.

Тем не менее, если вы обеспокоены тем, что производитель работает очень быстро, а потребитель очень медленно, я предлагаю вам изучить механизм регулирования.Но я бы не рекомендовал подход из принятого ответа на вопрос ограничение размера почтового ящика scala .

Попытка отправки сообщений о перегрузке, когда система сильно загружена, не кажется хорошейидея, в общем.Что если ваша система слишком занята, чтобы проверить ее на перегрузку?Что если получатель сообщения о перегрузке слишком занят, чтобы на него воздействовать?Кроме того, удаление сообщений не очень хорошая идея для меня.Я думаю, что вы хотите, чтобы все ваши рабочие элементы обрабатывались надежно.

Кроме того, я бы не стал полагаться на mailboxSize при определении нагрузки.Вы не можете различить разные типы сообщений и можете проверять только изнутри самого потребителя, а не от производителя.

Я предлагаю использовать подход, при котором потребитель запрашивает больше работы, когда он знает, что может с этим справиться.

Ниже приведен простой пример того, как это можно реализовать.

import scala.actors._
import Actor._

object ConsumerProducer {
  def main(args: Array[String]) {
    val producer = new Producer(Iterator.range(0, 10000))
    val consumer = new Consumer(producer)
  }
}

case class Produce(count: Int)
case object Finished

class Producer[T](source: Iterator[T]) extends Actor {

  start

  def act() {
    loopWhile(source.hasNext) {
      react {
        case Produce(n: Int) => produce(n)
      } 
    }
  }

  def produce(n: Int) {
    println("producing " + n)
    var remaining = n
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
    if(!source.hasNext) sender ! Finished
  }
}

class Consumer(producer: Actor) extends Actor {

  start

  private var remaining = 0

  def act() {
    requestWork()
    consume()
  }

  def consume(): Nothing = react {
    case Finished => println("Finished")
    case n: Int => work(n); requestWork(); consume()
  }

  def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }

  def work(n: Int) = {
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
    remaining -= 1
  }
}
...