Использование ключевой группы KeyValueGroupedDataset для более чем двух наборов данных - PullRequest
0 голосов
/ 07 сентября 2018

В старом API Spark на основе RDD было возможно "объединить" до 3 RDD с вашими исходными RDD за один вызов cogroup, предполагая, что все они были парой RDD с одним и тем же ключом.

В новом API набора данных кажется, что мне нужно вызывать groupByKey дважды и cogroup один раз для каждого набора данных, с которым я хочу сгруппироваться.

Например, представьте, что у меня есть набор данных с недавними данными об активности и два набора метаданных, которые я использую для предоставления контекста для деятельности, и я хочу объединить их (история и профиль могут быть большими структурами, и если я объедините их с активностью, объединенный набор данных будет неоправданно большим). Вот что я делаю сегодня:

// These data types all have an "id" field for correlation
val activityData: Dataset[Activity] = getActivity()
val locationHistory: Dataset[LocationHistory] = getLocationHistory()
val profiles: Dataset[Profile] = getProfiles()

// This first cogroup aligns activity with location history
val partialGroup = activityData.groupByKey(_.id)
    .cogroup(locationHistory.groupByKey(_.id)) {
        case (id, activity, location) if activity.nonEmpty =>
            (id, activity, location)

        case _ => None
    }

// This second cogroup adds the profile to complete the grouping
val fullGroup = partialGroup.groupByKey(_._1)
    .cogroup(profiles.groupByKey(_.id)) {
        case (id, activityAndLocation, profile) =>
            activityAndLocation.map { case(_, activity, location) =>
                (id, activity, location, profile)
            }
    }

Кажется, это немного длинно для того, что было бы по существу однострочным в RDD API. Есть ли другой способ сделать это в API набора данных, который не требует такого большого количества повторений?

...