Конвертировать Traversable [T] в Stream [T] без обхода или переполнения стека - PullRequest
0 голосов
/ 04 июня 2018

Я использую библиотеку, которая предоставляет Traversable [T], который просматривает результаты базы данных.Я хотел бы избежать загрузки всего этого в память, поэтому я пытаюсь преобразовать его в поток [T].

Из того, что я могу сказать, встроенный метод "asStream" загружает весь Traversableв буфер, который побеждает мою цель.Моя попытка (ниже) наталкивается на StackOverflowException для больших результатов, и я не могу сказать, почему.Может ли кто-нибудь помочь мне понять, что происходит?Спасибо!

def asStream[T](traversable: => Traversable[T]): Stream[T] = {
  if (traversable.isEmpty) Empty
  else {
    lazy val head = traversable.head
    lazy val tail = asStream(traversable.tail)
    head #:: tail
  }
}

Вот полный пример, который воспроизводит это, основываясь на предложении @ SCouto

import scala.collection.immutable.Stream.Empty

object StreamTest {
  def main(args: Array[String]) = {
    val bigVector = Vector.fill(90000)(1)
    val optionStream = asStream(bigVector).map(v => Some(v))
    val zipped = optionStream.zipAll(optionStream.tail, None, None)
  }

  def asStream[T](traversable: => Traversable[T]): Stream[T] = {
    @annotation.tailrec
    def loop(processed: => Stream[T], pending: => Traversable[T]): Stream[T] = {
      if (pending.isEmpty) processed
      else {
        lazy val head = pending.head
        lazy val tail = pending.tail
        loop(processed :+ head, tail)
      }
    }

    loop(Empty, traversable)
  }
}

Редактировать: После некоторых интересных идей от @SCouto я узнал, что это может такжесделать с батутами, чтобы сохранить результат в виде потока [T], который находится в исходном порядке

object StreamTest {
  def main(args: Array[String]) = {
    val bigVector = Range(1, 90000).toVector
    val optionStream = asStream(bigVector).map(v => Some(v))
    val zipped = optionStream.zipAll(optionStream.tail, None, None)
    zipped.take(10).foreach(println)
  }

  def asStream[T](traversable: => Traversable[T]): Stream[T] = {
    sealed trait Traversal[+R]
    case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
    case object Done extends Traversal[Nothing]

    def next(currentTraversable: Traversable[T]): Traversal[T] = {
      if (currentTraversable.isEmpty) Done
      else More(currentTraversable.head, () => next(currentTraversable.tail))
    }

    def trampoline[R](body: => Traversal[R]): Stream[R] = {
      def loop(thunk: () => Traversal[R]): Stream[R] = {
        thunk.apply match {
          case More(result, next) => Stream.cons(result, loop(next))
          case Done => Stream.empty
        }
      }
      loop(() => body)
    }

    trampoline(next(traversable))
  }
}

Ответы [ 2 ]

0 голосов
/ 04 июня 2018

Stream не хранит данные в памяти, потому что вы объявляете, как генерировать каждый элемент.Весьма вероятно, что данные вашей базы данных не были сгенерированы процедурно, поэтому вам нужно получить данные при первом запросе (что-то вроде def getData(index: Int): Future[Data]).

Самая большая проблема возникает, так как выизвлекают данные из базы данных, вы, вероятно, используете Future s, поэтому, даже если вам удастся достичь этого, у вас будет Future[Stream[Data]] объект, который не очень удобен в использовании или, что еще хуже, блокирует его.

Не было бы гораздо более достойным просто разбить запрос на данные вашей базы данных?

0 голосов
/ 04 июня 2018

Попробуйте:

  def asStream[T](traversable: => Traversable[T]): Stream[T] = {

    @annotation.tailrec
    def loop(processed: Stream[T], pending: Traversable[T]): Stream[T] = {
      if (pending.isEmpty) processed
      else {
        lazy val head = pending.head
        lazy val tail = pending.tail
        loop(head #:: processed, tail)
      }
    }

    loop(Empty, traversable)
  }

Главное, чтобы рекурсивный вызов был последним действием вашей рекурсивной функции.

Чтобы убедиться в этом, вы можете использовать как вложенный метод (в примере называемый loop), так и аннотацию tailrec, которая гарантирует, что ваш метод безопасен для хвоста.

Вы можете найти информацию о tail rec здесь и в этом удивительном ответе здесь

EDIT Проблема заключалась в том, что мыдобавляли элемент в конце потока.Если вы добавите его в качестве заголовка Stream, как в вашем примере, он будет работать нормально.Я обновил свой код.Пожалуйста, проверьте его и сообщите нам результат.

Мои тесты:

scala> val optionStream = asStream(Vector.fill(90000)(1)).map(v => Some(v))
optionStream: scala.collection.immutable.Stream[Some[Int]] = Stream(Some(1), ?)

scala> val zipped = optionStream.zipAll(optionStream.tail, None, None)
zipped: scala.collection.immutable.Stream[(Option[Int], Option[Int])] = Stream((Some(1),Some(1)), ?)

EDIT2:

Согласно вашим комментариям и с учетомПример fpinscala, как вы сказали.Я думаю, что это может помочь вам.Дело в том, чтобы создать структуру класса дел с ленивой оценкой.Где голова представляет собой единый элемент, а хвост - проходимый

sealed trait myStream[+T] {
  def head: Option[T] = this match {
    case MyEmpty => None
    case MyCons(h, _) => Some(h())
  }


  def tail: myStream[T] = this match {
      case MyEmpty => MyEmpty
      case MyCons(_, t) => myStream.cons(t().head, t().tail)
    }
}
case object MyEmpty extends myStream[Nothing]
case class MyCons[+T](h: () => T, t: () => Traversable[T]) extends myStream[T]


object myStream {

  def cons[T](hd: => T, tl: => Traversable[T]): myStream[T] = {
    lazy val head = hd
    lazy val tail = tl

    MyCons(() => head, () => tail)
  }

  def empty[T]: myStream[T] = MyEmpty

  def apply[T](as: T*): myStream[T] = {
    if (as.isEmpty) empty
    else cons(as.head, as.tail)
  }
}

Некоторые Быстрые тесты:

  val bigVector = Vector.fill(90000)(1)
myStream.cons(bigVector.head, bigVector.tail)
res2: myStream[Int] = MyCons(<function0>,<function0>)

Извлечение головы:

res2.head
res3: Option[Int] = Some(1)

И хвост:

res2.tail
res4: myStream[Int] = MyCons(<function0>,<function0>)

РЕДАКТИРОВАТЬ3

Батутный раствор по указанию:

 def asStream[T](traversable: => Traversable[T]): Stream[T] = {
    sealed trait Traversal[+R]
    case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
    case object Done extends Traversal[Nothing]

    def next(currentTraversable: Traversable[T]): Traversal[T] = {
      if (currentTraversable.isEmpty) Done
      else More(currentTraversable.head, () => next(currentTraversable.tail))
    }

    def trampoline[R](body: => Traversal[R]): Stream[R] = {
      def loop(thunk: () => Traversal[R]): Stream[R] = {
        thunk.apply match {
          case More(result, next) => Stream.cons(result, loop(next))
          case Done => Stream.empty
        }
      }
      loop(() => body)
    }

    trampoline(next(traversable))
      }
    }
...