Обработка одновременно в Scala - PullRequest
8 голосов
/ 17 июня 2009

Как в моем собственном ответе на мой собственный вопрос , у меня возникла ситуация, когда я обрабатываю большое количество событий, которые поступают в очередь. Каждое событие обрабатывается точно таким же образом, и каждое событие может обрабатываться независимо от всех других событий.

Моя программа использует платформу параллелизма Scala, и многие из задействованных процессов смоделированы как Actor s. Поскольку Actor s обрабатывают свои сообщения последовательно, они не подходят для этой конкретной проблемы (хотя мои другие актеры выполняют действия, которые являются последовательными). Поскольку я хочу, чтобы Scala «контролировала» все процессы создания потоков (что, я полагаю, является целью создания системы параллелизма в первую очередь), похоже, у меня есть 2 варианта:

  1. Отправка событий в пул обработчиков событий, которыми я управляю
  2. заставить мой Actor обрабатывать их одновременно с помощью другого механизма

Я бы подумал, что # 1 сводит на нет смысл использования подсистемы актеров: сколько акторов процессора я должен создать? - один очевидный вопрос. Эти вещи предположительно скрыты от меня и решаются подсистемой.

Мой ответ был следующим:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Есть ли лучший подход? Это неправильно?

edit: возможно лучший подход:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}

Ответы [ 5 ]

8 голосов
/ 18 июня 2009

Это похоже на дубликат другого вопроса. Поэтому я продублирую свой ответ

Актеры обрабатывают одно сообщение за раз. Классический шаблон для обработки нескольких сообщений - наличие одного фронта координатора для группы потребителей. Если вы используете реагировать, то пул потребителей может быть большим, но все равно будет использовать только небольшое количество потоков JVM. Вот пример, где я создаю пул из 10 потребителей и одного координатора для них.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Этот код проверяет, какой потребитель доступен, и отправляет запрос этому потребителю. Альтернативы - просто случайным образом назначать потребителям или использовать планировщик циклического перебора.

В зависимости от того, что вы делаете, вам может быть лучше подано с Scala's Futures. Например, если вам не нужны актеры, тогда все вышеперечисленные механизмы можно записать как

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
3 голосов
/ 17 июня 2009

Если все события могут обрабатываться независимо, почему они находятся в очереди? Не зная ничего о вашем дизайне, это кажется ненужным шагом. Если бы вы могли составить функцию process с тем, что запускает эти события, вы могли бы потенциально избавиться от очереди.

Актер, по сути, является параллельным эффектом, снабженным очередью. Если вы хотите обрабатывать несколько сообщений одновременно, вам не нужен актер. Вы просто хотите, чтобы функция (Any => ()) была запланирована для выполнения в удобное время.

Сказав это, ваш подход разумен , если вы хотите остаться в библиотеке актеров и если очередь событий не находится под вашим контролем.

Scalaz проводит различие между актерами и одновременными эффектами. В то время как Actor очень легкий, scalaz.concurrent.Effect еще легче. Вот ваш код, примерно переведенный в библиотеку Scalaz:

val eventProcessor = effect (x => process x)

Это последняя версия ствола, еще не выпущенная.

1 голос
/ 20 мая 2011

Акторы намного легче, чем потоки, и, как таковой, еще один вариант - использовать объекты акторов, такие как объекты Runnable, которые вы использовали для отправки в пул потоков. Основное отличие состоит в том, что вам не нужно беспокоиться о ThreadPool - пул потоков управляется платформой актера и в основном является проблемой конфигурации.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Затем, чтобы отправить сообщение, скажите это:

submit(new MyEvent(x))

, что соответствует

eventProcessor ! new MyEvent(x)

от вашего вопроса.

Успешно протестирован этот шаблон: за четыре секунды на четырехъядерный ноутбук i7 было отправлено и получено 1 миллион сообщений.

Надеюсь, это поможет.

1 голос
/ 17 июня 2009

Назначение субъекта (ну, в общем, одного из них) состоит в том, чтобы гарантировать, что состояние внутри субъекта может быть доступно только одному потоку за раз.Если обработка сообщения не зависит от какого-либо изменяемого состояния в субъекте, то, вероятно, было бы более уместным просто передать задачу планировщику или пулу потоков для обработки.Дополнительная абстракция, которую предоставляет актер, на самом деле мешает вам.

В scala.actors.Scheduler есть удобные методы, или вы можете использовать Executor из java.util.concurrent.

1 голос
/ 17 июня 2009

Это звучит как простая проблема потребителя / производителя. Я бы использовал очередь с пулом потребителей. Вы могли бы написать это с помощью нескольких строк кода, используя java.util.concurrent.

...