Скала актеры оставили висеть - PullRequest
2 голосов
/ 11 января 2012

Я порождаю небольшое количество актеров для извлечения, обработки и сохранения элементов RSS-ленты в базе данных.Это делается с помощью основного метода объекта, работающего на cron.Я создаю этих актеров и раздаю им задания по мере того, как они выполняют предыдущую работу, назначенную им.Мой основной класс порождает одного актера, который распределяет работу среди группы актеров.В конце концов основной метод, кажется, зависает.Это не выходит, но выполнение останавливается на всех актерах.Мой технический директор считает, что главное - выйти, прежде чем актеры завершат свою работу и покинут их, но я не уверен, что это так.Я не получаю успешный выход на мейн (вообще нет выхода).

По сути, мне интересно, как отлаживать этих актеров, и какая возможная причина могла бы вызвать это.Будет ли основной выход до того, как актеры завершат свое выполнение (и если это так, имеет ли это значение?) Из того, что я могу сказать, актеры, использующие получение, отображаются 1-в-1 в потоки, правильно?Код ниже.Пожалуйста, задавайте любые дополнительные вопросы, помощь очень ценится.Я знаю, что, возможно, не предоставил достаточно подробностей, я новичок в scala и актерах и буду обновлять по мере необходимости.

object ActorTester {
  val poolSize = 10
  var pendingQueue :Set[RssFeed] = RssFeed.pendingQueue

  def main(args :Array[String]) {
    val manager = new SpinnerManager(poolSize, pendingQueue)
    manager.start
  }
}

case object Stop

class SpinnerManager(poolSize :Int = 1, var pendingQueue :Set[RssFeed]) extends Actor {
  val pool = new Array[Spinner](poolSize)

  override def start() :Actor = {
    for (i <- 0 to (poolSize - 1)) {
      val spinner = new Spinner(i)
      spinner.start()
      pool(i) = spinner
    }
    super.start
  }

  def act() {
    for {
      s <- pool
      if (!pendingQueue.isEmpty)
     } {
       s ! pendingQueue.head
       pendingQueue = pendingQueue.tail
     }

    while(true) {
      receive {
        case id :Int => {
          if (!pendingQueue.isEmpty) {
            pool(id) ! pendingQueue.head
            pendingQueue = pendingQueue.tail             
          } else if ((true /: pool) { (done, s) => {
            if (s.getState != Actor.State.Runnable) {
              val exited = future {
                s ! Stop
                done && true
              }
              exited()
            } else {
              done && false
            }
          }}) {
            exit
          }
        } 
      }
    }
  }
}

class Spinner(id :Int) extends Actor {
  def act() {
    while(true) {
      receive {
        case dbFeed :RssFeed => {
          //process rss feed
          //this has multiple network requests, to the original blogs, bing image api
          //our instance of solr - some of these spawn their own actors
          sender ! id
        }
        case Stop => exit
      }
    }
  }
}

Ответы [ 2 ]

2 голосов
/ 11 января 2012

Во-первых, вы делаете крошечную, но важную ошибку, когда складываете влево, чтобы определить, все ли актеры Спиннера «завершили» или нет. Что вы должны сделать, это оценить done && true соотв. done && false в конце кейса, но сейчас вы просто говорите true соотв. false без учета done.

Например, представьте, что у вас есть 4 актера Spinner, где первый и второй были Runnable, третий not и четвертый снова Runnable. В этом случае результат вашего фолд-лева будет true, несмотря на то, что третий актер еще не закончил. Если бы вы использовали логический &&, вы бы получили правильный результат.

Возможно, это также приводит к зависанию вашего приложения.

РЕДАКТИРОВАТЬ: Также была проблема с состоянием гонки. Следующий код работает сейчас, надеюсь, это поможет. В любом случае, мне было интересно, разве актерская реализация Scala автоматически не использует рабочие потоки?

import actors.Actor
import scala.collection.mutable.Queue

case class RssFeed()

case class Stop()

class Spinner(id: Int) extends Actor {
  def act() {
    loop {
      react {
        case dbFeed: RssFeed => {
          // Process RSS feed
          sender ! id
        }
        case Stop => exit()
      }
    }
  }
}

class SpinnerManager(poolSize: Int, pendingQueue: Queue[RssFeed]) extends Actor {
  val pool = Array.tabulate(poolSize)(new Spinner(_).start())

  def act() {
    for (s <- pool; if (!pendingQueue.isEmpty)) {
      pendingQueue.synchronized {
        s ! pendingQueue.dequeue()
      }
    }

    loop {
      react {
        case id: Int => pendingQueue.synchronized {
          if (!pendingQueue.isEmpty) {
            Console println id
            pool(id) ! pendingQueue.dequeue()
          } else {
            if (pool forall (_.getState != Actor.State.Runnable)) {
              pool foreach (_ ! Stop)
              exit()
            }
          }
        }
      }
    }
  }

}

object ActorTester {
  def main(args: Array[String]) {
    val poolSize = 10
    val pendingQueue: Queue[RssFeed] = Queue.tabulate(100)(_ => RssFeed())
    new SpinnerManager(poolSize, pendingQueue).start()
  }
}
0 голосов
/ 15 января 2012

Итак, после нескольких дней отладки я решил эту проблему. Предложения кода fotNelton были очень полезны, поэтому я дал ему право голоса. Тем не менее, они не решили саму проблему. Я обнаружил, что если вы выполняете это в методе main, то если родительские акторы завершают работу перед своими дочерними акторами, программа будет зависать вечно и никогда не завершится, сохраняя при этом всю свою память. В процессе обработки RSS-канала сборщик порождает актеров и отправляет им сообщения для выполнения действий, связанных с сетевыми запросами. Эти актеры должны завершить свою работу до того, как родительский актер уйдет. Фетчер не стал бы ждать, пока эти актеры закончат, после того, как он отправит сообщение, он просто двинется дальше. Поэтому он скажет менеджеру, что он закончил, прежде чем его дети-актеры закончили всю свою работу. Чтобы справиться с этим, одним из вариантов будет использование фьючерсов и ожидание, когда актеры закончат (довольно медленно). Мое решение состояло в том, чтобы создать службы, доступные через URL (POST для службы, у которой есть субъект, ожидающий реакции). Служба ответит сразу и отправит сообщение своему действующему лицу. Таким образом, актеры могут выйти, как только они отправят запрос в службу, и не нужно создавать других актеров.

object FeedFetcher {
  val poolSize = 10
  var pendingQueue :Queue[RssFeed] = RssFeed.pendingQueue

  def main(args :Array[String]) {
    new FetcherManager(poolSize, pendingQueue).start
  }
}

case object Stop

class FetcherManager(poolSize :Int = 1, var pendingQueue :Queue[RssFeed]) extends Actor {
  val pool = new Array[Fetcher](poolSize)
  var numberProcessed = 0

  override def start() :Actor = {
    for (i <- 0 to (poolSize - 1)) {
      val fetcher = new Fetcher(i)
      fetcher.start()
      pool(i) = fetcher
    }
    super.start
  }

  def act() {
    for {
      f <- pool
      if (!pendingQueue.isEmpty)
     } {
      pendingQueue.synchronized {
        f ! pendingQueue.dequeue
      }
    }

    loop {
      reactWithin(10000L) {
        case id :Int => pendingQueue.synchronized {
          numberProcessed = numberProcessed + 1
          if (!pendingQueue.isEmpty) {
            pool(id) ! pendingQueue.dequeue             
          } else if ((true /: pool) { (done, f) => {
            if (f.getState == Actor.State.Suspended) {
              f ! Stop
              done && true
            } else if (f.getState == Actor.State.Terminated) {
              done && true
            } else {
              false
            }
          }}) {
            pool foreach { f => {
              println(f.getState)
            }}
            println("Processed " + numberProcessed + " feeds total.")
            exit
          }
        }
        case TIMEOUT => {
          if (pendingQueue.isEmpty) {
            println("Manager just woke up from timeout with all feeds assigned.")
            pool foreach { f => {
              if (f.getState == Actor.State.Suspended) {
                println("Sending Stop to Fetcher " + f.id)
                f ! Stop
              }
            }}
            println("Checking state of all Fetchers for termination.")
            if ((true /: pool) { (done, f) => {
              done && (f.getState == Actor.State.Terminated)
            }}) {
              exit
            }
          }
        }
      }
    }
  }
}

class Fetcher(val id :Int) extends Actor {
  var feedsIveDone = 0
  def act() {
    loop {
      react {
        case dbFeed :RssFeed => {
          println("Fetcher " + id + " starting feed")
          //process rss feed here
          feedsIveDone = feedsIveDone + 1
          sender ! id
        }
        case Stop => {
          println(id + " exiting")
          println(feedsIveDone)
          exit
        }
      }
    }
  }
...