Я новичок в Spark и пытаюсь использовать pyspark (Spark 2.2) для выполнения операций фильтрации и агрегации с очень широким набором функций (~ 13 миллионов строк, 15 000 столбцов).Набор функций хранится в виде паркетных файлов на диске S3.Я запускаю тестовый скрипт, чтобы загрузить набор функций в кадре данных, выбрать несколько тысяч записей, сгруппировать по определенному коду региона и усреднить каждый из 15 тыс. Столбцов функций.Проблема заключается в том, что задание либо дает ошибку, либо занимает слишком много времени (около 8 часов для выборки из 5% записей).
Существуют ли способы ускорения операций такого типа на широком фрейме данных вPyspark?Я использую ноутбуки Jupyter и хотел бы, чтобы эти запросы выполнялись в считанные минуты, а не часы.
Вот мой код
df_feature_store = spark.read.parquet(PATH_FEATURE_STORE).sample(False, 0.05, seed=0).cache()
logger.info("Initial data set loaded and sampled")
df_selected_rors = spark.read.csv(PATH_DATA_SOURCE+"ROR Sample.csv", header=True)
agg_cols = [x for x in df_feature_store.columns if re.search("^G\d{2}",x)]
agg_cols = agg_cols[:10] # just testing with fewer columns
expr = {x:"mean" for x in agg_cols}
joineddf = df_feature_store.join(df_selected_rors, df_feature_store.ROLLOUTREGION_IDENTIFIER == df_selected_rors.ROR, "inner")
aggdf = joineddf.groupby("ROLLOUT_REGION_IDENTIFIER").agg(expr)
# replace groupby
# loop for a 1000 column aggregations
# transpose columns into rows as arrays
aggdf.write.mode("overwrite").csv(PATH_FEATURE_STORE + "aggregated", header=True)
logger.info("Done")`