Изучение Акка Стримс. У меня есть поток записей, многие за единицу времени, уже упорядоченные по времени (из Slick), и я хочу объединить их в группы времени для обработки, определяя, когда изменяется временной шаг.
Пример
case class Record(time: Int, payload: String)
Если входящий поток равен
Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...
Я хотел бы преобразовать это в
Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...
Пока я нашел только группировкуфиксированным числом записей или разбиением на множество подпотоков, но с моей точки зрения мне не нужно несколько подпотоков.
Обновление: Я нашел batch
, но это больше относится к противодавлению, чем к непрерывному пакетированию.