Как я могу выполнить несколько задач в Scala? - PullRequest
41 голосов
/ 22 декабря 2010

У меня 50 000 задач, и я хочу выполнить их с 10 потоками. В Java я должен создать Executers.threadPool (10) и передать runnable для затем ждать обработки всех. Scala, как я понимаю, особенно полезен для этой задачи, но я не могу найти решение в документации.

Ответы [ 4 ]

58 голосов
/ 22 декабря 2010

Scala 2.9.3 и более поздние версии

Самый простой подход - использовать класс scala.concurrent.Future и связанную с ним инфраструктуру. Метод scala.concurrent.future асинхронно оценивает переданный ему блок и немедленно возвращает Future[A], представляющий асинхронное вычисление. Фьючерсами можно манипулировать несколькими неблокирующими способами, включая отображение, flatMapping, фильтрацию, восстановление ошибок и т. Д.

Например, вот пример, который создает 10 задач, где каждая задача спит произвольное количество времени, а затем возвращает квадрат значения, переданного ей.

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val aggregated: Future[Seq[Int]] = Future.sequence(tasks)

val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)

В этом примере мы сначала создаем последовательность отдельных асинхронных задач, которые после завершения предоставляют int. Затем мы используем Future.sequence, чтобы объединить эти асинхронные задачи в одну асинхронную задачу - поменять местами Future и Seq в типе. Наконец, мы блокируем текущий поток до 15 секунд, ожидая результата. В этом примере мы используем глобальный контекст выполнения, который поддерживается пулом потоков fork / join. Для нетривиальных примеров вы, вероятно, захотите использовать приложение ExecutionContext.

Как правило, следует избегать блокировок, когда это возможно. В классе Future доступны другие комбинаторы, которые могут помочь программировать в асинхронном стиле, включая onSuccess, onFailure и onComplete.

Также рассмотрите возможность изучения библиотеки Akka , которая обеспечивает параллелизм на основе акторов для Scala и Java и взаимодействует с scala.concurrent.

Scala 2.9.2 и ранее

Этот самый простой подход заключается в использовании класса Future Scala, который является подкомпонентом платформы Actors. Метод scala.actors.Futures.future создает Future для переданного ему блока. Затем вы можете использовать scala.actors.Futures.awaitAll, чтобы дождаться завершения всех задач.

Например, вот пример, который создает 10 задач, где каждая задача спит произвольное количество времени, а затем возвращает квадрат значения, переданного ей.

import scala.actors.Futures._

val tasks = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)
16 голосов
/ 22 декабря 2010

Вы хотите посмотреть либо библиотеку актеров Scala, либо Akka. У Akka более чистый синтаксис, но любой из них поможет.

Похоже, вам нужно создать группу актеров, которые знают, как обрабатывать ваши задачи. Актером может быть любой класс с методом получения - из учебника Akka (http://doc.akkasource.org/tutorial-chat-server-scala):

class MyActor extends Actor {
  def receive = {
    case "test" => println("received test")
    case _ =>      println("received unknown message")
 }}

val myActor = Actor.actorOf[MyActor]
myActor.start

Вы захотите создать пул экземпляров актеров и отправлять им свои задания в виде сообщений. Вот сообщение о пуле актеров Akka, которое может быть полезным: http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/

В вашем случае может подойти по одному действующему субъекту на задачу (акторы очень легкие по сравнению с потоками, поэтому вы можете иметь их ОДНО в одной виртуальной машине), или вам может потребоваться более сложное распределение нагрузки между ними.

EDIT: Используя приведенный выше пример актера, отправить ему сообщение так же просто, как это:

myActor ! "test"

Затем актер выводит «полученный тест» на стандартный вывод.

Сообщения могут быть любого типа, и в сочетании с сопоставлением с шаблоном Scala у вас есть мощный шаблон для создания гибких параллельных приложений.

В общем, актеры Akka будут «поступать правильно» с точки зрения совместного использования потоков, и для нужд ОП, я думаю, по умолчанию все в порядке. Но если вам нужно, вы можете установить диспетчер, который должен использовать актер, в один из нескольких типов:

* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven

Установить диспетчера для актера тривиально:

class MyActor extends Actor {
  self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
    .withNewThreadPoolWithBoundedBlockingQueue(100)
    .setCorePoolSize(10)
    .setMaxPoolSize(10)
    .setKeepAliveTimeInMillis(10000)
    .build
}

См. http://doc.akkasource.org/dispatchers-scala

Таким образом, вы могли бы ограничить размер пула потоков, но, опять же, исходный вариант использования, вероятно, мог бы быть удовлетворен с экземплярами актера Akka 50 КБ, использующими диспетчеры по умолчанию, и он хорошо распараллелился бы.

Это действительно только царапает поверхность того, что может сделать Акка. Это приносит много того, что Эрланг предлагает языку Scala. Актеры могут отслеживать других актеров и перезапускать их, создавая приложения для самовосстановления. Akka также предоставляет программную транзакционную память и многие другие функции. Возможно, это «приложение-убийца» или «среда-убийца» для Scala.

8 голосов
/ 07 октября 2015

Вот еще один ответ, аналогичный ответу mpilquist, но без устаревшего API и включающий настройки потока через пользовательский ExecutionContext:

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._

val numJobs = 50000
var numThreads = 10

// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))


// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
  // do something more fancy here
  i
}

// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum
8 голосов
/ 22 декабря 2010

Если вы хотите «выполнить их с 10 потоками», используйте потоки.Модель актера в Scala, о которой обычно говорят люди, когда говорят, что Scala хороша для параллелизма, скрывает таких деталей, чтобы вы их не видели.

Использование актеров или будущего свсе, что у вас есть, - это простые вычисления, вы просто создадите 50000 из них и дадите им работать.Вы можете столкнуться с проблемами, но они имеют другую природу.

...