Scala: группируйте итерируемое в итерируемое из итерируемых по предикату - PullRequest
6 голосов
/ 22 ноября 2011

У меня есть очень большие итераторы, которые я хочу разбить на части.У меня есть предикат, который смотрит на элемент и возвращает истину, если это начало новой части.Мне нужно, чтобы кусочки были итераторами, потому что даже кусочки не помещаются в память.Есть так много кусочков, что я бы опасался, что рекурсивное решение взорвет ваш стек.Ситуация похожа на этот вопрос , но мне нужны итераторы вместо списков, и «часовые» (элементы, для которых предикат истинен) встречаются (и должны быть включены) в начале фрагмента.Получающиеся итераторы будут использоваться только по порядку, хотя некоторые могут вообще не использоваться, и они должны использовать только O (1) памяти.Я предполагаю, что это означает, что все они должны использовать один и тот же итератор.Производительность важна.

Если бы я сделал удар в сигнатуру функции, это было бы так:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ...

Я бы хотел использовать takeWhile, но он теряетпоследний элемент.Я исследовал span, но это буферизирует результаты.Моя текущая лучшая идея включает BufferedIterator, но, возможно, есть лучший способ.

Вы будете знать, что вы поняли это правильно, потому что что-то вроде этого не приводит к краху вашей JVM:

groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0).foreach(group => println(group.sum))
groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum))

Ответы [ 5 ]

5 голосов
/ 24 ноября 2011

Вот мое решение с использованием BufferedIterator. Он не позволяет правильно пропустить итераторы, но он довольно прост и функционален. Первый элемент (ы) входит в группу, даже если !startsGroup(first).

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
  new Iterator[Iterator[T]] {
    val base = iter.buffered
    override def hasNext = base.hasNext  
    override def next() = Iterator(base.next()) ++ new Iterator[T] {
      override def hasNext = base.hasNext && !startsGroup(base.head) 
      override def next() = if (hasNext) base.next() else Iterator.empty.next()
    }
  }

Обновление: сохранение небольшого состояния позволяет вам пропускать итераторы и не позволять людям возиться с предыдущими:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
  val base = iter.buffered
  var prev: Iterator[T] = Iterator.empty
  override def hasNext = base.hasNext  
  override def next() = {
    while (prev.hasNext) prev.next()        // Exhaust previous iterator; take* and drop* do NOT always work!!  (Jira SI-5002?)
    prev = Iterator(base.next()) ++ new Iterator[T] {
      var hasMore = true
      override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore } 
      override def next() = if (hasNext) base.next() else Iterator.empty.next()
    }
    prev
  }
}
5 голосов
/ 23 ноября 2011

У вас есть внутренняя проблема.Iterable подразумевает, что вы можете получить несколько итераторов.Iterator подразумевает, что вы можете пройти только один раз.Это означает, что ваш Iterable[Iterable[T]] должен иметь возможность производить Iterator[Iterable[T]] с.Но когда он возвращает элемент - Iterable[T] - и запрашивает несколько итераторов, базовый отдельный итератор не может выполнить это без кэширования результатов списка (слишком большого) или , вызывающегоисходный, повторяемый и повторяющий абсолютно все снова (очень неэффективно).

Итак, хотя вы могли бы сделать это, я думаю, вы должны понимать свою проблему по-другому.

Если бы вместо этого вы могли начать с Seq, вы могли бы получить подмножества в качестве диапазонов.

Если вы уже знаете, как хотите использовать итерацию, вы можете написать метод

def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *)

, который увеличивается через набор обработчиков каждый раз, когда starts запускает "true".Если есть какой-то способ, которым вы можете выполнить свою обработку за один раз, то вот как это сделать.(Однако вашим обработчикам придется сохранять состояние через изменяемые структуры данных или переменные.)

Если вы можете разрешить итерации во внешнем списке, чтобы разбить внутренний список, вы можете получить Iterable[Iterator[T]] с дополнительным ограничениемчто после того, как вы перейдете к более позднему подитератору, все предыдущие подитераторы будут недействительными.


Вот решение последнего типа (от Iterator[T] до Iterator[Iterator[T]]; его можно обернуть всделать внешние слои Iterable вместо).

class GroupedBy[T](source: Iterator[T])(starts: T => Boolean)
extends Iterator[Iterator[T]] {
  private val underlying = source
  private var saved: T = _
  private var cached = false
  private var starting = false
  private def cacheNext() {
    saved = underlying.next
    starting = starts(saved)
    cached = true
  }
  private def oops() { throw new java.util.NoSuchElementException("empty iterator") }
  // Comment the next line if you do NOT want the first element to always start a group
  if (underlying.hasNext) { cacheNext(); starting = true }
  def hasNext = {
    while (!(cached && starting) && underlying.hasNext) cacheNext()
    cached && starting
  }
  def next = {
    if (!(cached && starting) && !hasNext) oops()
    starting = false
    new Iterator[T] {
      var presumablyMore = true
      def hasNext = {
        if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext()
        presumablyMore = cached && !starting
        presumablyMore
      }
      def next = {
        if (presumablyMore && (cached || hasNext)) { 
          cached = false
          saved
        }
        else oops()
      }
    }
  }
}
1 голос
/ 11 февраля 2012

Вы можете сохранить небольшой объем памяти, используя Streams. Используйте result.toIterator, если вы снова итератор.

С потоками нет изменяемого состояния, только одно условие, и оно почти такое же сжатое, как и решение Джея Хакера.

 def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = {
    val base = iter.buffered
    val empty = Stream.empty[(B,  Iterator[A])]

    def getBatch(key: B) = {
      Iterator(base.next()) ++ new Iterator[A] {
        def hasNext: Boolean = base.hasNext && (f(base.head) == key)
        def next(): A = base.next()
      }
    }

    def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = {
      skipList.foreach{_.foreach{_=>}}

      if (base.isEmpty) empty
      else {
        val key = f(base.head)
        val batch = getBatch(key)

        Stream.cons((key, batch), next(Some(batch)))
      }
    }

    next()
  }

Я провел тесты:

scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0)
         .foreach{case(_,group) => println(group.sum)}
-1610612735
1073741823
-536870909
2147483646
2147483647

Второй тест печатает слишком много для вставки в переполнение стека.

1 голос
/ 23 ноября 2011

Если вы смотрите на ограничения памяти, то будет работать следующее. Вы можете использовать его, только если ваш базовый итерируемый объект поддерживает представления. Эта реализация будет перебирать Iterable, а затем генерировать IterableView, которые затем могут быть перебраны. Эта реализация не заботится, тестирует ли самый первый элемент как начальную группу, так как это будет независимо.

def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] {
  def iterator = new Iterator[Iterable[T]] {
    val i = iter.iterator
    var index = 0
    var nextView: IterableView[T, Iterable[T]] = getNextView()
    private def getNextView() = {
      val start = index
      var hitStartGroup = false
      while ( i.hasNext && ! hitStartGroup ) {
        val next = i.next()
        index += 1
        hitStartGroup = ( index > 1 && startsGroup( next ) )
      }
      if ( hitStartGroup ) {
        if ( start == 0 ) iter.view( start, index - 1 )
        else iter.view( start - 1, index - 1 )
      } else { // hit end
        if ( start == index ) null
        else if ( start == 0 ) iter.view( start, index )
        else iter.view( start - 1, index )
      }
    }
    def hasNext = nextView != null
    def next() = {
      if ( nextView != null ) {
        val next = nextView
        nextView = getNextView()
        next
      } else null
    }
  }
}
0 голосов
/ 28 августа 2014
import scala.collection.mutable.ArrayBuffer

object GroupingIterator {

  /**
   * Create a new GroupingIterator with a grouping predicate.
   *
   * @param it The original iterator
   * @param p Predicate controlling the grouping
   * @tparam A Type of elements iterated
   * @return A new GroupingIterator
   */
  def apply[A](it: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean): GroupingIterator[A] =
    new GroupingIterator(it)(p)
}

/**
 * Group elements in sequences of contiguous elements that satisfy a predicate. The predicate
 * tests each single potential next element of the group with the help of the elements grouped so far.
 * If it returns true, the potential next element is added to the group, otherwise
 * a new group is started with the potential next element as first element
 *
 * @param self The original iterator
 * @param p Predicate controlling the grouping
 * @tparam A Type of elements iterated
 */
class GroupingIterator[+A](self: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean) extends Iterator[IndexedSeq[A]] {

  private[this] val source = self.buffered
  private[this] val buffer: ArrayBuffer[A] = ArrayBuffer()

  def hasNext: Boolean = source.hasNext

  def next(): IndexedSeq[A] = {
    if (hasNext)
      nextGroup()
    else
      Iterator.empty.next()
  }

  private[this] def nextGroup(): IndexedSeq[A] = {
    assert(source.hasNext)

    buffer.clear()
    buffer += source.next

    while (source.hasNext && p(source.head, buffer)) {
      buffer += source.next
    }

    buffer.toIndexedSeq
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...