Как сгруппировать объекты с помощью функции классификатора в FS2? - PullRequest
0 голосов
/ 23 июня 2018

У меня есть поток неупорядоченных measurements, которые я хотел бы сгруппировать в пакеты фиксированного размера, чтобы я мог эффективно сохранить их позже:

val measurements = for {
  id <- Seq("foo", "bar", "baz")
  value <- 1 to 5
} yield (id, value)

fs2.Stream.emits(scala.util.Random.shuffle(measurements)).toVector

То есть вместо:

(bar,4)
(foo,5)
(baz,3)
(baz,5)
(baz,4)
(foo,2)
(bar,2)
(foo,4)
(baz,1)
(foo,1)
(foo,3)
(bar,1)
(bar,5)
(bar,3)
(baz,2)

Я хотел бы иметь следующую структуру для размера пакета, равного 3:

(bar,[4,2,1])
(foo,[5,2,4])
(baz,[3,5,4])
(baz,[1,2])
(foo,[1,3])
(bar,[5,3])

Есть ли простой идиоматический способ добиться этого в FS2? Я знаю, что есть функция groupAdjacentBy , но она будет учитывать только соседние элементы.

Я сейчас на 0.10.5.

1 Ответ

0 голосов
/ 06 февраля 2019

Этого можно достичь с помощью fs2 Pull:

import cats.data.{NonEmptyList => Nel}
import fs2._

object GroupingByKey {
  def groupByKey[F[_], K, V](limit: Int): Pipe[F, (K, V), (K, Nel[V])] = {
    require(limit >= 1)

    def go(state: Map[K, List[V]]): Stream[F, (K, V)] => Pull[F, (K, Nel[V]), Unit] = _.pull.uncons1.flatMap {
      case Some(((key, num), tail)) =>
        val prev = state.getOrElse(key, Nil)
        if (prev.size == limit - 1) {
          val group = Nel.ofInitLast(prev.reverse, num)
          Pull.output1(key -> group) >> go(state - key)(tail)
        } else {
          go(state.updated(key, num :: prev))(tail)
        }
      case None =>
        val chunk = Chunk.vector {
          state
            .toVector
            .collect { case (key, last :: revInit) =>
              val group = Nel.ofInitLast(revInit.reverse, last)
              key -> group
            }
        }
        Pull.output(chunk) >> Pull.done
    }

    go(Map.empty)(_).stream
  }
}

Использование:

import cats.data.{NonEmptyList => Nel}
import cats.implicits._
import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object Answer extends IOApp {
  type Key = String

  override def run(args: List[String]): IO[ExitCode] = {
    require {
      Stream('a -> 1).through(groupByKey(2)).compile.toList ==
        List('a -> Nel.one(1))
    }

    require {
      Stream('a -> 1, 'a -> 2).through(groupByKey(2)).compile.toList ==
        List('a -> Nel.of(1, 2))
    }

    require {
      Stream('a -> 1, 'a -> 2, 'a -> 3).through(groupByKey(2)).compile.toList ==
        List('a -> Nel.of(1, 2), 'a -> Nel.one(3))
    }

    val infinite = (for {
      prng <- Stream.eval(IO { new scala.util.Random() })
      keys <- Stream(Vector[Key]("a", "b", "c", "d", "e", "f", "g"))
      key = Stream.eval(IO {
        val i = prng.nextInt(keys.size)
        keys(i)
      })
      num = Stream.eval(IO { 1 + prng.nextInt(9) })
    } yield (key zip num).repeat).flatten

    infinite
      .through(groupByKey(3))
      .showLinesStdOut
      .compile
      .drain
      .as(ExitCode.Success)
  }
}
...