Использование Scala Actors для создания чего-то похожего на конвейер - PullRequest
6 голосов
/ 17 декабря 2009

Я борюсь со следующей проблемой уже неделю, и мне нужен совет.

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

Я хочу построить трубопровод как:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

Пока что я реализовал каждый сегмент конвейера как актер. Мне нужно создавать выделенные экземпляры актеров для каждого запроса, так как некоторые из таких акторов, как filterXXX и объединение, должны поддерживать состояние для каждого запроса.

Такие функции, как askIMDB, дают несколько результатов, которые я хочу обрабатывать одновременно (каждая для отдельного субъекта). Поэтому я не нашел способа предварительно построить весь граф акторов перед выполнением запроса () и ни один изящный способ изменить его во время выполнения.

Моей первой попыткой была цепочка актеров, которые передавали в сообщениях как идентификаторы транзакций, поэтому у каждого актера была карта [TransactionID-> State], но это выглядело довольно уродливо. Вторая попытка состояла в том, чтобы создать что-то вроде конвейера, абстрагирующего орграф актеров в один поток, но до сих пор я потерпел неудачу.

Это мой первый пост, извините, если я что-то забыл или вопрос к общему / псевдокодированному. Любой совет очень ценится. Спасибо!

1 Ответ

4 голосов
/ 18 декабря 2009

Я предлагаю вам взглянуть на ScalaQuery , который делает примерно то же самое. И это можно сделать, потому что это проблема монады. На самом деле, некоторые решения на Haskell, такие как Arrows, которые реализованы библиотекой Scalaz , кажутся довольно близкими.

Это было бы лучшим решением, так как правильная абстракция облегчит изменения в будущем.

Как хак, я придумаю что-то вроде этого:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

EDIT

Вы также можете гарантировать заказ, с небольшой хитростью. Я пытаюсь избежать Scala 2.8, хотя это может сделать это намного проще с именованными параметрами и параметрами по умолчанию.

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

Теперь актеры Searchers хранят список фильтров, список сборщиков и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах, и для запроса. Для каждого результата они создают субъект фильтра для каждого фильтра в списке, отправляют каждому из них список сборщиков и объединитель, а затем отправляют им результат.

Актеры фильтра хранят список сборщиков и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах и о результатах поиска. Они отправляют свои выходные данные, если таковые имеются, вновь созданным актерам сборщика, которые сначала получают информацию о консолидаторе.

Сборщики сохраняют ссылку на консолидаторов. Они слушают сообщение, информирующее их об этой ссылке, и о результате из фильтра. Они, в свою очередь, отправляют свой результат в консолидатор.

Консолидатор прослушивает два сообщения. Одно сообщение от актеров сборщика информирует их о результатах, которые они накапливают. Другое сообщение, поступающее из запроса, запрашивает тот результат, который он возвращает.

Осталось только придумать способ сообщить консолидатору, что все результаты обработаны. Один из способов будет следующим:

  1. В Запросе сообщайте актеру Консолидатора о каждом Созданном Искателе. Консолидатор хранит их список с флагом, указывающим, завершены они или нет.
  2. Каждый поисковик хранит список фильтров, которые он создал, и ожидает от них сообщения «сделано». Когда у поисковика не осталось обработки, и он получил «выполнено» от всех фильтров, он отправляет сообщение в консолидатор, информирующее его о завершении.
  3. Каждый фильтр, в свою очередь, хранит список средств извлечения , которые он создал, и аналогичным образом ожидает от них "готовых" сообщений. Когда он завершил обработку и получил «выполнено» от всех сборщиков, он сообщает поисковику, что он сделал.
  4. Сборщик отправляет сообщение «готово» фильтру, который его создал, когда его работа завершена и отправлена ​​в консолидатор.
  5. Консолидатор прослушивает сообщение, запрашивающее результат только после того, как получил "сделано" от всех поисковиков.
...