Как я могу использовать параллельное программирование Scala для параллельных вычислений? - PullRequest
2 голосов
/ 13 декабря 2010

Я пытаюсь использовать Scala для нахождения параметра функции, которая выдает наибольшее возвращаемое значение, и я хотел бы сделать это параллельно.Поэтому для этой функции:

def f(i: Long): Double = {
  // do something with i and return a double
}  

Я хочу найти входной параметр i в диапазоне (0, x), который дает максимальное значение при передаче в функцию f.Это то, что я до сих пор:

import scala.concurrent.ops._

def parMap(f: Long => (Double, Long), xs: List[Int]): Array[(Double, Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  replicate(0, xs.length) { i => results(i) = f(xs(i)) }
  results
}

var results = parMap(i => (f(i), i), List.range(0, i)).max

Это может работать правильно, но я получаю java.lang.OutOfMemoryError: Ошибка пространства кучи Java.Для задачи, над которой я работаю, весь набор результатов будет слишком большим, чтобы поместиться в памяти, поэтому необходимо отбросить результаты, которые хуже, чем те, которые мы видели до сих пор.Если я сделаю диапазон списка достаточно маленьким, чтобы он поместился в памяти, мой массив результатов (до вызова метода max) выглядит примерно так:

Array(null, null, (-Infinity,2), (-Infinity,3), null, (-Infinity,5), (-Infinity,6), (-Infinity,7), (-Infinity,8), (-22184.3237904591,9), null, (-22137.315048628963,11)...

Значения -Infinity являются нормальными для чегоЯ делаю, но нули нет.Каждый раз, когда я запускаю его, я получаю разные нули, так что это случайно.Это похоже на то, что метод репликации «отказывается» от некоторых вызовов функций и вместо этого дает ноль.

Примечание. Я использую Scala 2.8.1.

Кроме того, мне кажется, точная документацияна Scala и параллельных вычислений трудно найти.Я хотел бы узнать больше, чтобы я мог самостоятельно разобраться с подобными проблемами.Кто-нибудь может предложить надежный ресурс, из которого я могу поучиться?

Ответы [ 2 ]

4 голосов
/ 13 декабря 2010

Я не совсем в курсе всех параллельных коллекций 2.9, и я не уверен, что concurrent.ops так хорошо поддерживается, но мне кажется, что ваша задача идеально подходит для фьючерсов в 2.8:

// Setup--you want to use longs, so you can't use range
val x = 4000000000L  // Note that this doesn't fit in a signed integer
def f(l: Long) = l + 8e9/(3+l)
def longRange(a: Long, b: Long) = new Iterator[Long] {
  private[this] var i = a
  def hasNext = i<b
  def next = { val j = i; i += 1; j }
}

val cpus = 4
val ranges = (1 to cpus).map(i => longRange(((i-1)*x)/cpus, (i*x)/cpus))
val maxes = ranges.map(r => scala.actors.Futures.future(r.map(f).max))
println("Total max is " + maxes.map(_()).max)

Здесь вы разделяете работу вручную и запрашиваете вычисление максимального значения для каждой части диапазона, которое предоставляется итератором по требованию.Они вычисляются в будущем, то есть Futures.future возвращает обещание, что оно в конечном итоге доставит возвращаемое значение.Обещание фактически выполняется, когда вызывается myFuture.apply(), в данном случае это _() внутри println.Чтобы получить общий максимум, вы должны взять максимум максимумов, и это, конечно, не может вернуться, пока вся работа, отложенная на будущее, фактически не будет завершена.

Вы можете попробовать сравнить время выполненияЧетырехпоточные и однопоточные версии, если вы хотите убедиться, что они работают.

(Обратите внимание, что ответ для предоставленной мной функции должен быть 4.000000001e9.)

Обратите внимание, чтоесли вы действительно хотите, чтобы все работало быстро, вам, вероятно, следует написать свои собственные тесты диапазона:

def maxAppliedRange(a: Long, b: Long, f: Long=>Double) = {
  var m = f(a)
  var i = a
  while (i < b) {
    val x = f(i)
    if (m < x) m = x
    i += 1
  }
  m
}
val maxes = (1 to cpus).map(i => 
  scala.actors.Futures.future( maxAppliedRange((i-1)*x/cpus,i*x/cpus,f) )
)
println("Total max is " + maxes.map(_()).max)

Это дает лучшую производительность, потому что нет никакой блокировки / распаковки, и, следовательно, сборщик мусора не загружен,и, таким образом, параллельная работа дает гораздо лучшие результаты.Для меня это работает примерно в 40 раз быстрее, чем описанный выше, и обратите внимание, что это также будет верно для параллельных коллекций.Так что будьте осторожны!Использование большего количества ядер не обязательно является способом ускорения ваших вычислений, особенно когда вы выполняете сложную задачу.

0 голосов
/ 13 декабря 2010

Я думаю, вы могли бы сделать это кратко, используя futures, но также используя глобальный пул потоков актера.В соответствии с вашим исходным примером:

import scala.actors.Futures._

def parMap(f: Long => (Double,Long), xs: List[Int]) : Array[(Double,Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  val futures = (0 until xs.length).map { i =>
    future { results(i) = f(xs(i)) }
  }
  futures.foreach(_())
  results
}

приводит к:

scala> parMap(l => (l.toDouble,l), List(1,2,3))
res2: Array[(Double, Long)] = Array((1.0,1), (2.0,2), (3.0,3))

Это распараллелит работу, которая будет выполнена.Если вы хотите оптимизировать его под количество процессоров, которое у вас есть, вы можете установить размер пула акторов с помощью свойствctors.corePoolSize иctors.maxPoolSize.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...