Все актеры в основном являются потоками, которые выполняются планировщиком под капотом.Планировщик создает пул потоков для выполнения актеров, грубо связанных с вашим числом ядер.Это означает, что вы можете просто создать актера для каждой задачи, которую нужно выполнить, и оставить все остальное для Scala:
for(i <- 1 to 20) {
actor {
print(i);
Thread.sleep(1000);
}
}
Недостатком здесь является то, что количество задач зависит от стоимости создания потока для каждой задачи.Задача может быть довольно дорогой, поскольку в Java потоки не так дешевы.
Простой способ создать ограниченный пул рабочих актеров и затем распределить задачи по ним через обмен сообщениями будет выглядеть примерно так:
import scala.actors.Actor._
val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
actor {
loop {
react {
case x: String => println(x)
}
}
}
}
for(i <- 1 to 20) {
val r = (new util.Random).nextInt(numWorkers)
pool(r) ! "task "+i
}
Причина, по которой мы хотим создать несколько акторов, заключается в том, что один актер обрабатывает только одно сообщение (т.е. задачу) за раз, поэтому для получения параллелизма для ваших задач вам необходимо создать несколько.
Примечание.: планировщик по умолчанию становится особенно важным, когда речь идет о задачах, связанных с вводом / выводом, поскольку в этом случае вы определенно захотите изменить размер пула потоков.Два хороших поста в блоге, в которых подробно рассказывается об этом: Изучите планирование актеров Scala и Подводный камень потока актеров Scala .
С учетом сказанного, Akka - это платформа Actor, которая предоставляет инструменты для более сложных рабочих процессов с Actors, и это то, что я бы использовал в любом реальном приложении.Вот исполнитель задачи балансировки нагрузки (а не случайный):
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}
class TaskHandler extends Actor {
def receive = {
case t: Task =>
// some computationally expensive thing
t.execute
case _ => println("default case is required in Akka...")
}
}
class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
val seq = new CyclicIterator(workerPool)
}
val router = actorOf(new TaskRouter(4)).start()
for(i <- 1 to 20) {
router ! Task(..)
}
Вы можете иметь различные типы балансировки нагрузки (CyclicIterator является циклическим распределением), поэтому вы можете проверить документы здесь для получения дополнительной информации.