В старом API Spark на основе RDD было возможно "объединить" до 3 RDD с вашими исходными RDD за один вызов cogroup
, предполагая, что все они были парой RDD с одним и тем же ключом.
В новом API набора данных кажется, что мне нужно вызывать groupByKey
дважды и cogroup
один раз для каждого набора данных, с которым я хочу сгруппироваться.
Например, представьте, что у меня есть набор данных с недавними данными об активности и два набора метаданных, которые я использую для предоставления контекста для деятельности, и я хочу объединить их (история и профиль могут быть большими структурами, и если я объедините их с активностью, объединенный набор данных будет неоправданно большим). Вот что я делаю сегодня:
// These data types all have an "id" field for correlation
val activityData: Dataset[Activity] = getActivity()
val locationHistory: Dataset[LocationHistory] = getLocationHistory()
val profiles: Dataset[Profile] = getProfiles()
// This first cogroup aligns activity with location history
val partialGroup = activityData.groupByKey(_.id)
.cogroup(locationHistory.groupByKey(_.id)) {
case (id, activity, location) if activity.nonEmpty =>
(id, activity, location)
case _ => None
}
// This second cogroup adds the profile to complete the grouping
val fullGroup = partialGroup.groupByKey(_._1)
.cogroup(profiles.groupByKey(_.id)) {
case (id, activityAndLocation, profile) =>
activityAndLocation.map { case(_, activity, location) =>
(id, activity, location, profile)
}
}
Кажется, это немного длинно для того, что было бы по существу однострочным в RDD API. Есть ли другой способ сделать это в API набора данных, который не требует такого большого количества повторений?