Неизменное дозирование потока - PullRequest
2 голосов
/ 22 октября 2019

Существует ли неизменная альтернатива решению в этом вопросе, который объединяет данные в потоке:

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

Производит

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

1 Ответ

0 голосов
/ 29 октября 2019

вы на хорошем пути, неизменность и отсутствие состояний - важный аспект в программировании параллельного распределенного программного обеспечения. Я делюсь этим scastie с вашим примером кода при использовании groupBy из akka . Дайте мне знать, если это поможет вам. Вывод как

(3,List(!))
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
...