преобразовать Spark DataFrame в словарь Scala, как формат - PullRequest
0 голосов
/ 18 октября 2019

У меня есть искровой фрейм данных, который мне нужно преобразовать в (key, value) пар. ниже приведен формат:

+--------------------+--------------------+-------------------+------+------+
|                 cid|                 uid|               date|rating|  type|
+--------------------+--------------------+-------------------+------+------+
|          1111111111|           user1-316|2019-10-11 14:01:49|     1|others|
|          1111111111|               user1|2019-10-11 14:25:35|     2|mobile|
|          1111111111|               user2|2019-10-11 14:30:05|     3|others|
|          1111111112|               user2|2019-10-11 14:16:58|     4|others|
|          1111111113|               user2|2019-10-11 14:32:00|     1|mobile|
+--------------------+--------------------+-------------------+------+------+

мне нужно агрегировать его на основе uid и создать список из cid, rating, date для каждого type

 uid       | history
-----------+--------------------------------------------------------
 user1-316 | {"others": [["1111111111", 1, "2019-10-11 14:01:49"]]}
 user1     | {"mobile": [["1111111111", 2, "2019-10-11 14:25:35"]]}
 user2     | {"others": [["1111111111", 3, "2019-10-11 14:30:05"],["1111111112", 4, "2019-10-11 14:16:58"]],"mobile":[["1111111113", 1, "2019-10-11 14:32:00"]]}

В Python я могу добиться этого, так как у нас есть формат dict. Как мы можем сделать это в Scala.

Ответы [ 2 ]

0 голосов
/ 18 октября 2019

Как сказано, мы можем иметь пару Key -> Value в scala, но не с аналогичным представлением Python

Сначала прочитайте данные

scala> val df = Seq((1111111111,"user1-316","2019-10-1114:01:49",1,"others"), (1111111111,"user1","2019-10-1114:25:35",2,"mobile"), (1111111111,"user2","2019-10-1114:30:05",3,"others"), (1111111112,"user2","2019-10-1114:16:58",4,"others"), (1111111113,"user2","2019-10-1114:32:00",1,"mobile")).toDF("cid","uid","date","rating","type")
df: org.apache.spark.sql.DataFrame = [cid: int, uid: string ... 3 more fields]
scala> df.show
+----------+---------+------------------+------+------+
|       cid|      uid|              date|rating|  type|
+----------+---------+------------------+------+------+
|1111111111|user1-316|2019-10-1114:01:49|     1|others|
|1111111111|    user1|2019-10-1114:25:35|     2|mobile|
|1111111111|    user2|2019-10-1114:30:05|     3|others|
|1111111112|    user2|2019-10-1114:16:58|     4|others|
|1111111113|    user2|2019-10-1114:32:00|     1|mobile|
+----------+---------+------------------+------+------+

Теперь мы конвертируем cid, рейтинг, дату к списку

scala> val df1 = df.groupBy($"uid", $"type").agg(collect_list(array($"cid", $"rating", $"date")).as("aggNew"))
df1: org.apache.spark.sql.DataFrame = [uid: string, type: string ... 1 more field]

scala> df1.show(false)
+---------+------+--------------------------------------------------------------------------------------------------+
|uid      |type  |aggNew                                                                                            |
+---------+------+--------------------------------------------------------------------------------------------------+
|user1    |mobile|[WrappedArray(1111111111, 2, 2019-10-1114:25:35)]                                                 |
|user2    |mobile|[WrappedArray(1111111113, 1, 2019-10-1114:32:00)]                                                 |
|user1-316|others|[WrappedArray(1111111111, 1, 2019-10-1114:01:49)]                                                 |
|user2    |others|[WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)]|
+---------+------+--------------------------------------------------------------------------------------------------+

Кроме того, применение groupBy к uid для получения требуемого (key, values)

scala> df1.groupBy($"uid").agg(collect_list(map($"type", $"aggNew"))).show(false)
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid      |collect_list(map(type, aggNew))                                                                                                                                                                              |
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user1-316|[Map(others -> WrappedArray(WrappedArray(1111111111, 1, 2019-10-1114:01:49)))]                                                                                                                               |
|user1    |[Map(mobile -> WrappedArray(WrappedArray(1111111111, 2, 2019-10-1114:25:35)))]                                                                                                                               |
|user2    |[Map(mobile -> WrappedArray(WrappedArray(1111111113, 1, 2019-10-1114:32:00))), Map(others -> WrappedArray(WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)))]|
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Добавлена ​​схема

root
 |-- uid: string (nullable = true)
 |-- collect_list(map(type, aggNew)): array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
0 голосов
/ 18 октября 2019

Обновленный ответ:

Вы можете попробовать что-то вроде этого. Я не уверен, что dict в Python, но для (ключ, значение) Scala имеет тип map.


scala> df.show
+----------+---------+-------------------+------+------+
|       cid|      uid|               date|rating|  type|
+----------+---------+-------------------+------+------+
|1111111111|user1-316|2019-10-11 14:01:49|     1|others|
|1111111111|    user1|2019-10-11 14:25:35|     2|mobile|
|1111111111|    user2|2019-10-11 14:30:05|     3|others|
|1111111112|    user2|2019-10-11 14:16:58|     4|others|
|1111111113|    user2|2019-10-11 14:32:00|     1|mobile|
+----------+---------+-------------------+------+------+

scala> df.withColumn("col1",array("cid","rating","date"))
         .groupBy("type","uid")
         .agg(map(col("type"),collect_list("col1")).as("col2")) 
         .groupBy("uid")
         .agg(collect_list(col("col2")).as("history"))
         .show(false)

+---------+----------------------------------------------------------------------------------------------------------------------------------------------+
|uid      |history                                                                                                                                       |
+---------+----------------------------------------------------------------------------------------------------------------------------------------------+
|user1-316|[[others -> [[1111111111, 1, 2019-10-11 14:01:49]]]]                                                                                          |
|user1    |[[mobile -> [[1111111111, 2, 2019-10-11 14:25:35]]]]                                                                                          |
|user2    |[[others -> [[1111111111, 3, 2019-10-11 14:30:05], [1111111112, 4, 2019-10-11 14:16:58]]], [mobile -> [[1111111113, 1, 2019-10-11 14:32:00]]]]|
+---------+----------------------------------------------------------------------------------------------------------------------------------------------+


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...