Функциональная обработка потоков Scala без ошибок OutOfMemory - PullRequest
20 голосов
/ 09 ноября 2010

Можно ли применить функциональное программирование к потокам Scala таким образом, чтобы поток обрабатывался последовательно, но уже обработанная часть потока могла быть собрана сборщиком мусора?

Например, я определяю Stream,содержит числа от start до end:

def fromToStream(start: Int, end: Int) : Stream[Int] = {
  if (end < start) Stream.empty
  else start #:: fromToStream(start+1, end)
}

Если я суммирую значения в функциональном стиле:

println(fromToStream(1,10000000).reduceLeft(_+_))

Я получу OutOfMemoryError - возможно, скадр стека вызова reduceLeft содержит ссылку на начало потока.Но если я делаю это в итеративном стиле, это работает:

var sum = 0
for (i <- fromToStream(1,10000000)) {
  sum += i
}

Есть ли способ сделать это в функциональном стиле без получения OutOfMemory?

UPDATE : ошибка в scala , которая исправлена.Так что это более или менее устарело.

Ответы [ 4 ]

19 голосов
/ 09 ноября 2010

Когда я начал изучать Stream, я подумал, что это круто. Тогда я понял, что Iterator - это то, что я хочу использовать почти все время.

Если вам нужно Stream, но вы хотите, чтобы reduceLeft работало:

fromToStream(1,10000000).toIterator.reduceLeft(_ + _)

Если вы попробуете строку выше, сборщик мусора будет работать очень хорошо. Я обнаружил, что использовать Stream сложно, так как легко удержать голову, не осознавая этого. Иногда стандартная библиотека будет держаться за вас - очень тонкими способами.

13 голосов
/ 09 ноября 2010

Да, вы можете. Хитрость заключается в том, чтобы использовать хвостовые рекурсивные методы, чтобы кадр локального стека содержал единственную ссылку на экземпляр Stream. Так как метод является хвостово-рекурсивным, локальная ссылка на предыдущую головку Stream будет стерта, как только он рекурсивно вызовет себя, что позволит GC собирать начало Stream по мере продвижения.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

Кроме того, вы должны убедиться, что объект, который вы передаете методу last выше, имеет только одну ссылку в стеке. Если вы сохраняете Stream в локальной переменной или значении, он не будет собирать мусор при вызове метода last, поскольку его аргумент - не единственная ссылка, оставленная на Stream. В приведенном ниже коде не хватает памяти.

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

Подведем итог:

  1. Использовать хвостовые рекурсивные методы
  2. Аннотировать их как хвост-рекурсивные
  3. Когда вы вызываете их, убедитесь, что их аргумент является единственной ссылкой на Stream

EDIT:

Обратите внимание, что это также работает и не приводит к ошибке нехватки памяти:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999

EDIT2:

И в случае reduceLeft, который вам требуется, вы должны определить вспомогательный метод с аргументом аккумулятора для результата.

Для reduLeft вам нужен аргумент-накопитель, который можно установить на определенное значение, используя аргументы по умолчанию. Упрощенный пример:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032
2 голосов
/ 10 ноября 2010

Как оказалось, это ошибка в текущей реализации reduLeft. Проблема состоит в том, что reduLeft вызывает foldLeft, и, таким образом, стековый фрейм reduLeft содержит ссылку на заголовок потока в течение всего вызова. foldLeft использует хвостовую рекурсию, чтобы избежать этой проблемы. Для сравнения:

(1 to 10000000).toStream.foldLeft(0)(_+_)
(1 to 10000000).toStream.reduceLeft(_+_)

Это семантически эквивалентно. В Scala версии 2.8.0 работает вызов foldLeft, но при вызове lowerLeft выбрасывается OutOfMemory. Если бы ReduLeft выполнял свою собственную работу, эта проблема не возникала бы.

2 голосов
/ 09 ноября 2010

Возможно, вы захотите взглянуть на эфемерные потоки Скалаза .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...