Оптимизировать функцию Collect_List Pyspark - PullRequest
1 голос
/ 17 октября 2019

Мне нужно агрегировать мои данные так, чтобы они генерировали этот вывод:

Вывод JSON

{
    "people": [
       {
         "firstName": "Jimi",
         "lastName": "Hendrix",
         "age": "27"
       },
       {
         "firstName": "Jimmy",
         "lastName": "Page",
         "age": "75"
       }
    ]
}

Однако при запуске функции агрегирования (ниже),Я получаю эту ошибку:

Caused by: org:apache.spark.SparkSession: Job aborted due to stage failure: Total size of serialized results of <task_size> task (20GB) is bigger than spark.driver.maxResultSize (20.0 GB)

Это наводит меня на мысль, что именно эта проблема вызвана функцией collect_list. Вместо того, чтобы выполнять задачи параллельно, они выполняются на одном узле и не хватает памяти.

Какой самый оптимальный способ создания вывода JSON? Есть ли способы оптимизировать функцию collect_list?

Пример кода:

def aggregate(df):
    return df.agg(collect_list(struct(
        df.firstName,
        df.lastName,
        df.age
    )).alias('people'))

1 Ответ

0 голосов
/ 21 октября 2019

Когда вы делаете collect_list, все данные собираются для водителя в виде списка. Вместо этого вы можете создать столбец в кадре данных с помощью JSON и сохранить его как CSV.

Альтернативой является увеличение памяти драйвера с помощью -

conf.set ("spark.driver.maxResultSize", "25g")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...