Объединение данных не-json и json может быть немного сложнее.Приведенное ниже решение создает структуру JSON для всех столбцов, включая ID и Имя, поэтому ее близкая аппроксимация конечного результата ..
сначала позволяет создать пример данных -
list1 = [1,"Jack"],[2,"Jill"],[3,"James"]
df1=spark.createDataFrame(list1,schema=["id","Name"])
list2= [101,"Activity1",1],[101,"Activity2",1],[201,"Activity3",2],[301,"Activity4",3]
df2=spark.createDataFrame(list2,schema=['Id','Activity','UserId'])
, а затем зарегистрировать оба кадра данных каквременные таблицы, поэтому мы можем выполнить sql для него, чтобы отформатировать данные так, как мы хотим -
df1.registerTempTable("table1")
df2.registerTempTable("table2")
Затем запустить sql, который использует комбинацию collect_list
и named_struct
, чтобы точно соответствовать вашим окончательным структурным требованиям
df3= spark.sql("""
WITH tmp
AS (SELECT t1.id,
Collect_list(Named_struct("id", t2.id, "name", t2.activity)) AS
Activities
FROM table1 t1
JOIN table2 t2
ON ( t1.id = t2.userid )
GROUP BY t1.id)
SELECT tmp.id,
t3.NAME,
tmp.activities
FROM tmp
JOIN table1 t3
ON ( tmp.id = t3.id )
""")
df3.toJSON().collect()
это дает мне результат как -
['{"id":1,"NAME":"Jack","activities":[{"id":101,"name":"Activity1"},{"id":101,"name":"Activity2"}]}',
'{"id":3,"NAME":"James","activities":[{"id":301,"name":"Activity4"}]}',
'{"id":2,"NAME":"Jill","activities":[{"id":201,"name":"Activity3"}]}']
, если я удаляю toJSON()
преобразований и просто показываю результат, он отображается как
df3.show()
+---+-----+-----------------------------------+
| id| NAME| activities |
+---+-----+-----------------------------------+
| 1| Jack|[[101, Activity1],[101, Activity2]]|
| 3|James| [[301, Activity4]] |
| 2| Jill| [[201, Activity3]] |
+---+-----+-----------------------------------+