Spark DataFrame для вложенного JSON - PullRequest
0 голосов
/ 08 ноября 2018

У меня есть фрейм данных joinDf, созданный путем объединения следующих четырех фреймов данных на userId:

val detailsDf = Seq((123,"first123","xyz"))
                .toDF("userId","firstName","address")


val emailDf = Seq((123,"abc@gmail.com"),
                  (123,"def@gmail.com"))
              .toDF("userId","email")


val foodDf = Seq((123,"food2",false,"Italian",2),
                 (123,"food3",true,"American",3),
                 (123,"food1",true,"Mediterranean",1))
            .toDF("userId","foodName","isFavFood","cuisine","score")


val gameDf = Seq((123,"chess",false,2),
                 (123,"football",true,1))
             .toDF("userId","gameName","isOutdoor","score")

val joinDf = detailsDf
            .join(emailDf, Seq("userId"))
            .join(foodDf, Seq("userId"))
            .join(gameDf, Seq("userId"))

User еда и фавориты игры должны быть упорядочены по возрастанию.

Я пытаюсь создать результат из этого joinDf, где JSON выглядит следующим образом:

[
  {
  "userId": "123",
  "firstName": "first123",
  "address": "xyz",
  "UserFoodFavourites": [
    {
     "foodName": "food1",
     "isFavFood": "true",
     "cuisine": "Mediterranean",
    },
    {
     "foodName": "food2",
     "isFavFood": "false",
     "cuisine": "Italian",
    },
    {
     "foodName": "food3",
     "isFavFood": "true",
     "cuisine": "American",
    }
   ]
   "UserEmail": [
     "abc@gmail.com",
     "def@gmail.com"
   ]
   "UserGameFavourites": [
     {
      "gameName": "football",
      "isOutdoor": "true"
     },
     {
      "gameName": "chess",
      "isOutdoor": "false"
     }
   ]
  }
]

Должен ли я использовать joinDf.groupBy().agg(collect_set())?

Любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 10 ноября 2018

Основная проблема объединения перед группировкой и сбором списков заключается в том, что объединение приведет к большому количеству записей для свертывания группы, в вашем примере это 12 записей после объединения и перед группировкой, также вам нужно беспокоиться о выбор "firstName", "address" out detailsDf из 12 дубликатов. Чтобы избежать обеих проблем, вы можете предварительно обработать продовольственные, электронные и игровые фреймы данных, используя struct и groupBy, и присоединить их к detailsDf без риска взрыва ваших данных из-за нескольких записей с одним и тем же userId в объединенных таблицах.

val detailsDf = Seq((123,"first123","xyz"))
            .toDF("userId","firstName","address")


val emailDf = Seq((123,"abc@gmail.com"),
              (123,"def@gmail.com"))
          .toDF("userId","email")


val foodDf = Seq((123,"food2",false,"Italian",2),
             (123,"food3",true,"American",3),
             (123,"food1",true,"Mediterranean",1))
        .toDF("userId","foodName","isFavFood","cuisine","score")


val gameDf = Seq((123,"chess",false,2),
             (123,"football",true,1))
         .toDF("userId","gameName","isOutdoor","score")

val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

val foodGrp = foodDf
          .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
          .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

val gameGrp = gameDf
          .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
          .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

val result = detailsDf.join(emailGrp, Seq("userId"))
        .join(foodGrp, Seq("userId"))
        .join(gameGrp, Seq("userId"))

result.show(100, false)

Выход:

+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail                     |UserFoodFavourites                                                                       |UserGameFavourites                      |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123   |first123 |xyz    |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+

Поскольку все groupBy выполняются на userId и также включаются, спарк оптимизирует его достаточно хорошо.

ОБНОВЛЕНИЕ 1 : После того, как @ user238607 указал, что я пропустил исходное требование сортировки предпочтений в продуктах питания, быстро исправил и поместил столбец оценка в качестве первого элемента структуры UserFoodFavourites и использует функцию sort_array для упорядочения данных в нужном порядке без принудительного выполнения дополнительной операции перемешивания. Обновлен код и его вывод.

0 голосов
/ 10 ноября 2018

Мое решение основано на найденных ответах здесь и здесь

Используется функция Window. В нем показано, как создать вложенный список предпочтений в еде для данного userid на основе оценки продуктов. Здесь мы создаем struct из FoodDetails из столбцов, которые у нас есть

val foodModifiedDf = foodDf.withColumn("FoodDetails",
                            struct("foodName","isFavFood", "cuisine","score"))
                            .drop("foodName","isFavFood", "cuisine","score")

println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)

Здесь мы создаем функцию Windowing, которая будет накапливать список для userId на основе FoodDetails.score в порядке убывания. Оконная функция, когда применяется, продолжает накапливать список, поскольку она встречает новые строки с тем же userId. После того, как мы закончили накопление, мы должны сделать groupBy над userId, чтобы выбрать самый большой список.

import org.apache.spark.sql.expressions.Window


val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

val userAndFood = detailsDf.join(foodModifiedDf, "userId")

val newUF  = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)

val resultUF = newUF.groupBy("userId")
                  .agg(max("FDNew"))

println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)

Вот как в итоге выглядит результат:

+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew)                                                                               |
+------+-----------------------------------------------------------------------------------------+
|123   |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+

Учитывая этот фрейм данных, должно быть проще выписать вложенный json.

...