Акка стрим дозирования - PullRequest
       31

Акка стрим дозирования

0 голосов
/ 20 октября 2019

Изучение Акка Стримс. У меня есть поток записей, многие за единицу времени, уже упорядоченные по времени (из Slick), и я хочу объединить их в группы времени для обработки, определяя, когда изменяется временной шаг.

Пример

case class Record(time: Int, payload: String)

Если входящий поток равен

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

Я хотел бы преобразовать это в

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

Пока я нашел только группировкуфиксированным числом записей или разбиением на множество подпотоков, но с моей точки зрения мне не нужно несколько подпотоков.

Обновление: Я нашел batch, но это больше относится к противодавлению, чем к непрерывному пакетированию.

1 Ответ

3 голосов
/ 20 октября 2019

statefulMapConcat - это мультитул в библиотеке Akka Streams.

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

При выполнении вышеизложенного выдается следующее:

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

Вы можете настроитьпример для печати Batch объектов.

...