Итак, у меня есть следующий тип:
case class Episode(
parentTconst: String,
seasonNumber: Int,
episodeNumber: Int
)
И следующий источник:
val episodeSource: Source[Episode, _] = FileIO.fromPath(Paths.get(myDataFilePath)).via(myDataParserToEpisode)
Пример списка Эпизодов будет:
Seq(
Episode("gameof", 5, 8),
Episode("mentalist", 2, 4),
Episode("gameof", 5, 8),
Episode("mentalist", 1, 8),
Episode("rikiandmanual", 1, 8)
)
Я пытаюсь создать следующий поток (я не уверен, что это правильная подпись, пожалуйста, посоветуйте, может быть, это Sink, так как я использую весь поток для получения карты):
def gimmeThoseEpisodeGroups: Flow[Episode, Map[String, Seq[Episode]], _]
, чтовыглядеть так, если напечатано:
Map(
"gameof" -> Seq(Episode("gameof", 5, 8), Episode("gameof", 5, 8)),
"mentalist" -> Seq(Episode("mentalist", 2, 4), Episode("mentalist", 1, 8)),
"rikiandmanual" -> Seq(Episode("rikiandmanual", 1, 8))
)
Я пробовал так много комбинаций, которые не работали.Может быть, подход не правильный.
Я думаю, что я должен использовать groupBy.Другое дело, может быть, мне нужно использовать поток в первый раз, чтобы получить список ключей для групп (или, возможно, найти способ использовать его один раз, но с двумя результатами).
Я DuckDucked и выяснил,о трансляции, но я еще не обдумал это.
def getGroupedByParentTConst: Flow[Episode, Map[String, Seq[Episode]], _] = Flow[Episode].groupBy(2, _.parentTconst)
Но это не помогло (во всяком случае, я чувствую, что это неправильный путь.
Кроме того, я решил, что яизвлекет parentTconst для использования их ключей группировки:
def getParentTConst: Flow[Episode, Set[String], _] = Flow[Episode].fold(Set.empty[String]) {
(right, left) => { right ++ Set(left.parentTconst) }
}
Это работает, но я застрял, выясняя, как использовать их для группировки моего исходного источника ...
Кроме того, еслиЯ использую список ключей, это означает, что мне придется использовать два потока, чтобы получить мою группировку.
Я думаю, что у меня есть проблема Утки, это не очень сложная проблема, но, поскольку у меня есть ограничениеиспользование akka-streams не совсем просто.
Любая помощь будет признательна.