Как сказано, мы можем иметь пару Key -> Value в scala, но не с аналогичным представлением Python
Сначала прочитайте данные
scala> val df = Seq((1111111111,"user1-316","2019-10-1114:01:49",1,"others"), (1111111111,"user1","2019-10-1114:25:35",2,"mobile"), (1111111111,"user2","2019-10-1114:30:05",3,"others"), (1111111112,"user2","2019-10-1114:16:58",4,"others"), (1111111113,"user2","2019-10-1114:32:00",1,"mobile")).toDF("cid","uid","date","rating","type")
df: org.apache.spark.sql.DataFrame = [cid: int, uid: string ... 3 more fields]
scala> df.show
+----------+---------+------------------+------+------+
| cid| uid| date|rating| type|
+----------+---------+------------------+------+------+
|1111111111|user1-316|2019-10-1114:01:49| 1|others|
|1111111111| user1|2019-10-1114:25:35| 2|mobile|
|1111111111| user2|2019-10-1114:30:05| 3|others|
|1111111112| user2|2019-10-1114:16:58| 4|others|
|1111111113| user2|2019-10-1114:32:00| 1|mobile|
+----------+---------+------------------+------+------+
Теперь мы конвертируем cid, рейтинг, дату к списку
scala> val df1 = df.groupBy($"uid", $"type").agg(collect_list(array($"cid", $"rating", $"date")).as("aggNew"))
df1: org.apache.spark.sql.DataFrame = [uid: string, type: string ... 1 more field]
scala> df1.show(false)
+---------+------+--------------------------------------------------------------------------------------------------+
|uid |type |aggNew |
+---------+------+--------------------------------------------------------------------------------------------------+
|user1 |mobile|[WrappedArray(1111111111, 2, 2019-10-1114:25:35)] |
|user2 |mobile|[WrappedArray(1111111113, 1, 2019-10-1114:32:00)] |
|user1-316|others|[WrappedArray(1111111111, 1, 2019-10-1114:01:49)] |
|user2 |others|[WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)]|
+---------+------+--------------------------------------------------------------------------------------------------+
Кроме того, применение groupBy к uid для получения требуемого (key, values)
scala> df1.groupBy($"uid").agg(collect_list(map($"type", $"aggNew"))).show(false)
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid |collect_list(map(type, aggNew)) |
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user1-316|[Map(others -> WrappedArray(WrappedArray(1111111111, 1, 2019-10-1114:01:49)))] |
|user1 |[Map(mobile -> WrappedArray(WrappedArray(1111111111, 2, 2019-10-1114:25:35)))] |
|user2 |[Map(mobile -> WrappedArray(WrappedArray(1111111113, 1, 2019-10-1114:32:00))), Map(others -> WrappedArray(WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)))]|
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Добавлена схема
root
|-- uid: string (nullable = true)
|-- collect_list(map(type, aggNew)): array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: array (valueContainsNull = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: string (containsNull = true)