преобразование генератор / блок в итератор / поток - PullRequest
14 голосов
/ 26 сентября 2010

В основном я хочу преобразовать это:

def data(block: T => Unit)

в поток (dataToStream - гипотетическая функция, которая выполняет это преобразование):

val dataStream: Stream[T] = dataToStream(data)

Я полагаю, эта проблема может быть решенапо продолжениям:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Спасибо, Давид

Ответы [ 4 ]

9 голосов
/ 28 сентября 2010

РЕДАКТИРОВАНИЕ: Изменены примеры, чтобы показать лень traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

Метод toTraversable преобразует вашу функцию данных в коллекцию Traversable.Само по себе это ничего особенного, но вы можете преобразовать это в TraversableView, который ленив.Вот пример:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

Неудачная природа метода take заключается в том, что он должен пройти один раз после последнего сгенерированного значения, чтобы работать правильно, но он завершится рано.Приведенный выше код будет выглядеть так же без вызова ".view".Тем не менее, вот более убедительный пример:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

Итак, в заключение, я считаю, что вы ищете коллекцию TraversableView, которая является наиболее простой для создания представления, создающего Traversable, а затем вызывающего «view» для него.,Если вам действительно нужен тип Stream, вот метод, который работает в 2.8.0.final и создаст «поток» без потоков:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

К сожалению, этот метод будет повторятьсявесь пройденный прежде чем сделать поток.Это также означает, что все значения должны быть помещены в буфер.Единственная альтернатива - прибегать к потокам.

В качестве отступления: это было мотивирующей причиной, чтобы предпочесть Traversables в качестве прямого возврата от методов scalax.io.File: "lines", chars "и" bytes ".

3 голосов
/ 26 сентября 2010

Вот простое решение, которое порождает поток, который потребляет данные.Он отправляет данные в SynchronousQueue.Поток данных извлечения из очереди создается и возвращается:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
2 голосов
/ 27 сентября 2010

Вот реализация с разделителями, основанная на продолжениях, адаптированная из предложения @Geoff Reedy:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
2 голосов
/ 26 сентября 2010

Я все еще должен выяснить, как сделать это сам. Я подозреваю, что ответ лежит где-то здесь:

Редактировать: удален код, показывающий, как решить другую проблему.

Edit2: Используя код http://gist.github.com/580157, который был первоначально размещен http://gist.github.com/574873,, вы можете сделать это:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data не принимает код блока, но я думаю, что это нормально, потому что с продолжением блок может быть обработан вызывающей стороной. Код для генератора можно увидеть в гист на github.

...