Мне нужно агрегировать мои данные так, чтобы они генерировали этот вывод:
Вывод JSON
{
"people": [
{
"firstName": "Jimi",
"lastName": "Hendrix",
"age": "27"
},
{
"firstName": "Jimmy",
"lastName": "Page",
"age": "75"
}
]
}
Однако при запуске функции агрегирования (ниже),Я получаю эту ошибку:
Caused by: org:apache.spark.SparkSession: Job aborted due to stage failure: Total size of serialized results of <task_size> task (20GB) is bigger than spark.driver.maxResultSize (20.0 GB)
Это наводит меня на мысль, что именно эта проблема вызвана функцией collect_list
. Вместо того, чтобы выполнять задачи параллельно, они выполняются на одном узле и не хватает памяти.
Какой самый оптимальный способ создания вывода JSON? Есть ли способы оптимизировать функцию collect_list
?
Пример кода:
def aggregate(df):
return df.agg(collect_list(struct(
df.firstName,
df.lastName,
df.age
)).alias('people'))