У меня есть метод PySpark, который применяет функцию разнесения к каждому столбцу массива в DataFrame.
def explode_column(df, column):
select_cols = list(df.columns)
col_position = select_cols.index(column)
select_cols[col_position] = explode_outer(column).alias(column)
return df.select(select_cols)
def explode_all_arrays(df):
still_has_arrays = True
exploded_df = df
while still_has_arrays:
still_has_arrays = False
for f in exploded_df.schema.fields:
if isinstance(f.dataType, ArrayType):
print(f"Exploding: {f}")
still_has_arrays = True
exploded_df = explode_column(exploded_df, f.name)
return exploded_df
Когда у меня есть небольшое количество столбцов для разнесения, он работает отлично, но на больших DataFrames (~ 200 столбцов с ~ 40 взрывами), после завершения DataFrame не может быть записан как Parquet.
Даже небольшой объем данных не удается (400 КБ) не во время обработки метода, а на этапе записи.
Подсказки? Я попытался записать фреймворк как таблицу и как паркетный файл. Он работает, когда я сохраняю его как временное представление. Под «не могу писать» я имею в виду, что он продолжает обработку и ничего не происходит. Задание выполняется часами даже с очень маленьким файлом, и мне нужно принудительно остановить его.
Если я сохраню его как временное представление, я могу использовать, например, «select * from temp_table», но когда я попробуйте записать результат в постоянную таблицу или паркетный файл, он будет обрабатываться вечно.
Есть подсказки?