Я предлагаю вам взглянуть на ScalaQuery , который делает примерно то же самое. И это можно сделать, потому что это проблема монады. На самом деле, некоторые решения на Haskell, такие как Arrows, которые реализованы библиотекой Scalaz , кажутся довольно близкими.
Это было бы лучшим решением, так как правильная абстракция облегчит изменения в будущем.
Как хак, я придумаю что-то вроде этого:
abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate
class Query(title: String) {
self =>
// Create actors
def createActor(qm: QueryModifiers): Actor = {
val actor = qm match {
case Consolidate => // create a consolidator actor
case //... as needed
}
actor.start
actor
}
// The pipeline
val pipe: List[List[QueryModifiers]] = Nil
// Build the pipeline
def ->(qms: List[QueryModifiers]) = new Query(title) {
override val pipe = qms :: self.pipe
}
def ->(qm: QueryModifiers) = new Query(title) {
override val pipe = List(qm) :: self.pipe
}
def ->(c: Consolidate.type) = {
// Define the full pipeline
// Because the way pipe is built, the last layer comes first, and the first comes last
val pipeline = Consolidate :: pipe
// Create an actor for every QueryModifier, using an unspecified createActor function
val actors = pipeline map (_ map (createActor(_))
// We have a list of lists of actors now, where the first element of the list
// was the last QueryModifiers we received; so, group the layers by two, and for each
// pair, make the second element send the result to the first.
// Since each layer can contain many actors, make each member of the second
// layer send the results to each member of the first layer.
// The actors should be expecting to receive message SendResultsTo at any time.
for {
List(nextLayer, previousLayer) <- actors.iterator sliding 2
nextActor <- nextLayer
previousActor <- previousLayer
} previousActor ! SendResultsTo(nextActor)
// Send the query to the first layer
for ( firstActor <- actors.last ) firstActor ! Query(title)
// Get the result from the last layer, which is the consolidator
val results = actors.head.head !? Results
// Return the results
results
}
}
EDIT
Вы также можете гарантировать заказ, с небольшой хитростью. Я пытаюсь избежать Scala 2.8, хотя это может сделать это намного проще с именованными параметрами и параметрами по умолчанию.
sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers
class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {
// Build the pipeline
def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
}
// Do similarly for qm: NextQM
// Consolidation
def ->(qm: Consolidate.type) = {
// Create Searchers actors
// Send them the Filters
// Send them Fetchers
// Create the Consolidator actor
// Send it to Searchers actors
// Send Searchers the query
// Ask Consolidator for answer
}
}
object Query {
def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}
Теперь актеры Searchers хранят список фильтров, список сборщиков и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах, и для запроса. Для каждого результата они создают субъект фильтра для каждого фильтра в списке, отправляют каждому из них список сборщиков и объединитель, а затем отправляют им результат.
Актеры фильтра хранят список сборщиков и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах и о результатах поиска. Они отправляют свои выходные данные, если таковые имеются, вновь созданным актерам сборщика, которые сначала получают информацию о консолидаторе.
Сборщики сохраняют ссылку на консолидаторов. Они слушают сообщение, информирующее их об этой ссылке, и о результате из фильтра. Они, в свою очередь, отправляют свой результат в консолидатор.
Консолидатор прослушивает два сообщения. Одно сообщение от актеров сборщика информирует их о результатах, которые они накапливают. Другое сообщение, поступающее из запроса, запрашивает тот результат, который он возвращает.
Осталось только придумать способ сообщить консолидатору, что все результаты обработаны. Один из способов будет следующим:
- В Запросе сообщайте актеру Консолидатора о каждом Созданном Искателе. Консолидатор хранит их список с флагом, указывающим, завершены они или нет.
- Каждый поисковик хранит список фильтров, которые он создал, и ожидает от них сообщения «сделано». Когда у поисковика не осталось обработки, и он получил «выполнено» от всех фильтров, он отправляет сообщение в консолидатор, информирующее его о завершении.
- Каждый фильтр, в свою очередь, хранит список средств извлечения , которые он создал, и аналогичным образом ожидает от них "готовых" сообщений. Когда он завершил обработку и получил «выполнено» от всех сборщиков, он сообщает поисковику, что он сделал.
- Сборщик отправляет сообщение «готово» фильтру, который его создал, когда его работа завершена и отправлена в консолидатор.
- Консолидатор прослушивает сообщение, запрашивающее результат только после того, как получил "сделано" от всех поисковиков.