Как разделить поток Fs2 по ключу, чтобы преобразовать каждый раздел отдельно? - PullRequest
0 голосов
/ 21 декабря 2018

Чего я хочу достичь, например , данные:

time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5

и преобразование:

stream.keyBy(_.part).scan(0)((s, d) => s + d)

получить:

0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12

Я попытался разбить его, используя groupAdjacentBy, но это становится слишком сложным, потому что мне нужно сохранять сложное состояние между каждым чанком с ключом.Интересно, есть ли что-то похожее на Flink DataStream. keyBy ?Или более простой способ реализовать это?

Ответы [ 2 ]

0 голосов
/ 02 февраля 2019

Проблема, как указано, может быть решена путем «разбиения» в самой операции сканирования:

import cats.implicits._
import cats.effect.IO
import fs2._

case class Element(time: Long, part: Symbol, value: Int)

val elements = Stream(
  Element(0, 'a, 3),
  Element(1, 'a, 4),
  Element(2, 'b, 10),
  Element(3, 'b, 20),
  Element(3, 'a, 5)
)

val runningSumsByPart = elements
  .scan(Map.empty[Symbol, Int] -> none[Element]) {
    case ((sums, _), el@Element(_, part, value)) =>
      val sum = sums.getOrElse(part, 0) + value
      (sums + (part -> sum), el.copy(value = sum).some)
  }
  .collect { case (_, Some(el)) => el }

runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()

Выходы:

Элемент (0, 'a, 3)

Элемент (1, 'a, 7)

Элемент (2,' b, 10)

Элемент (3, 'b, 30)

Элемент (3, 'a, 12)

0 голосов
/ 24 декабря 2018

ОК, я нашел интересное решение (однако не может быть flatten)

...