Не удается записать DataFrame после использования функции разнесения для нескольких столбцов - PullRequest
0 голосов
/ 03 августа 2020

У меня есть метод PySpark, который применяет функцию разнесения к каждому столбцу массива в DataFrame.

def explode_column(df, column):
    select_cols = list(df.columns)
    col_position = select_cols.index(column)
    select_cols[col_position] = explode_outer(column).alias(column)
    return df.select(select_cols)

def explode_all_arrays(df):
    still_has_arrays = True
    exploded_df = df

    while still_has_arrays:
        still_has_arrays = False
        for f in exploded_df.schema.fields:
            if isinstance(f.dataType, ArrayType):
                print(f"Exploding: {f}")
                still_has_arrays = True
                exploded_df = explode_column(exploded_df, f.name)

    return exploded_df

Когда у меня есть небольшое количество столбцов для разнесения, он работает отлично, но на больших DataFrames (~ 200 столбцов с ~ 40 взрывами), после завершения DataFrame не может быть записан как Parquet.

Даже небольшой объем данных не удается (400 КБ) не во время обработки метода, а на этапе записи.

Подсказки? Я попытался записать фреймворк как таблицу и как паркетный файл. Он работает, когда я сохраняю его как временное представление. Под «не могу писать» я имею в виду, что он продолжает обработку и ничего не происходит. Задание выполняется часами даже с очень маленьким файлом, и мне нужно принудительно остановить его.

Если я сохраню его как временное представление, я могу использовать, например, «select * from temp_table», но когда я попробуйте записать результат в постоянную таблицу или паркетный файл, он будет обрабатываться вечно.

Есть подсказки?

...