Потребление памяти параллельным Scala Stream - PullRequest
6 голосов
/ 22 марта 2012

Я написал приложение Scala (2.9.1-1), которое должно обработать несколько миллионов строк из запроса к базе данных. Я конвертирую ResultSet в Stream, используя технику, показанную в ответе на один из моих предыдущих вопросов :

class Record(...)

val resultSet = statement.executeQuery(...)

new Iterator[Record] {
  def hasNext = resultSet.next()
  def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...)
}.toStream.foreach { record => ... }

и это сработало очень хорошо.

Поскольку тело замыкания foreach очень сильно загружает процессор, и в качестве подтверждения практичности функционального программирования, если я добавлю .par перед foreach, замыкания будут выполняться параллельно с другими усилие, за исключением того, чтобы убедиться, что тело замыкания является потокобезопасным (оно написано в функциональном стиле без изменяемых данных, кроме печати в поточно-безопасный журнал).

Однако меня беспокоит потребление памяти. .par вызывает загрузку всего набора результатов в ОЗУ или параллельная операция загружает столько строк, сколько у нее активных потоков? Я выделил 4G для JVM (64-разрядная с -Xmx4g), но в будущем я буду запускать его на еще большем количестве строк и беспокоиться о том, что со временем у меня будет недостаточно памяти.

Существует ли лучшая модель для выполнения такой параллельной обработки функциональным образом? Я показываю это приложение моим коллегам в качестве примера ценности функционального программирования и многоядерных машин.

Ответы [ 2 ]

4 голосов
/ 22 марта 2012

Если вы посмотрите на скаладок Stream, вы заметите, что класс определения par является чертой Parallelizable ... и, если вы посмотрите на Исходный код этой черты , вы заметите, что он берет каждый элемент из исходной коллекции и помещает их в объединитель, таким образом, вы загружаете каждую строку в ParSeq:

  def par: ParRepr = {
    val cb = parCombiner
    for (x <- seq) cb += x
    cb.result
  }

  /** The default `par` implementation uses the combiner provided by this method
   *  to create a new parallel collection.
   *
   *  @return  a combiner for the parallel collection of type `ParRepr`
   */
  protected[this] def parCombiner: Combiner[A, ParRepr]

Возможным решением является явное распараллеливание вычислений, например, благодаря актерам. Вы можете взглянуть на этот пример из документации akka, например, это может быть полезно в вашем контексте.

0 голосов
/ 16 октября 2015

Новая библиотека akka stream - это исправление, которое вы ищете:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}

def iterFromQuery() : Iterator[Record] = {
  val resultSet = statement.executeQuery(...)
  new Iterator[Record] {
    def hasNext = resultSet.next()
    def next = new Record(...)
  }
}

def cpuIntensiveFunction(record : Record) = {
...
}

implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val execContext = actorSystem.dispatcher

val poolSize = 10 //number of Records in memory at once

val stream = 
  Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction))

stream onComplete {_ => actorSystem.shutdown()}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...