Как вы группируете элементы потока, используя ключи, присутствующие в том же потоке? - PullRequest
1 голос
/ 12 июня 2019

Итак, у меня есть следующий тип:

case class Episode(
  parentTconst: String,
  seasonNumber: Int,
  episodeNumber: Int
)

И следующий источник:

val episodeSource: Source[Episode, _] = FileIO.fromPath(Paths.get(myDataFilePath)).via(myDataParserToEpisode)

Пример списка Эпизодов будет:

Seq(
  Episode("gameof", 5, 8),
  Episode("mentalist", 2, 4),
  Episode("gameof", 5, 8),
  Episode("mentalist", 1, 8),
  Episode("rikiandmanual", 1, 8)
)

Я пытаюсь создать следующий поток (я не уверен, что это правильная подпись, пожалуйста, посоветуйте, может быть, это Sink, так как я использую весь поток для получения карты):

def gimmeThoseEpisodeGroups: Flow[Episode, Map[String, Seq[Episode]], _]

, чтовыглядеть так, если напечатано:

Map(
  "gameof"        -> Seq(Episode("gameof", 5, 8), Episode("gameof", 5, 8)),
  "mentalist"     -> Seq(Episode("mentalist", 2, 4), Episode("mentalist", 1, 8)),
  "rikiandmanual" -> Seq(Episode("rikiandmanual", 1, 8))
)

Я пробовал так много комбинаций, которые не работали.Может быть, подход не правильный.

Я думаю, что я должен использовать groupBy.Другое дело, может быть, мне нужно использовать поток в первый раз, чтобы получить список ключей для групп (или, возможно, найти способ использовать его один раз, но с двумя результатами).

Я DuckDucked и выяснил,о трансляции, но я еще не обдумал это.

    def getGroupedByParentTConst: Flow[Episode, Map[String, Seq[Episode]], _] = Flow[Episode].groupBy(2, _.parentTconst)

Но это не помогло (во всяком случае, я чувствую, что это неправильный путь.

Кроме того, я решил, что яизвлекет parentTconst для использования их ключей группировки:

    def getParentTConst: Flow[Episode, Set[String], _] = Flow[Episode].fold(Set.empty[String]) {
      (right, left) => { right ++ Set(left.parentTconst) }
    }

Это работает, но я застрял, выясняя, как использовать их для группировки моего исходного источника ...

Кроме того, еслиЯ использую список ключей, это означает, что мне придется использовать два потока, чтобы получить мою группировку.

Я думаю, что у меня есть проблема Утки, это не очень сложная проблема, но, поскольку у меня есть ограничениеиспользование akka-streams не совсем просто.

Любая помощь будет признательна.

1 Ответ

1 голос
/ 13 июня 2019

То, чего вы пытаетесь достичь, не может быть выполнено в потоковом режиме, так как создание карты всех сгруппированных эпизодов требует чтения всех данных в памяти. Промежуточного результата нет.

Если вы все еще хотите это сделать, вы можете использовать сложение аналогично тому, что вы предлагаете

Flow[Episode].fold(Map.empty[String, List[Episode]]) { (map, e) ⇒
  val key = e.parentTconst
  map + (key → v :: map.getOrElse(key, Nil))
}

Но так как при этом будут прочитаны все файлы в памяти, вы также можете избавиться от хлопот, связанных с использованием akka-streams, и просто использовать scala.io.Source.

Если у вас так много данных, которые вы не можете уместить в памяти, вам нужно изменить требование.

...