Слишком медленная работа с широким массивом данных в Pyspark - PullRequest
0 голосов
/ 22 октября 2018

Я новичок в Spark и пытаюсь использовать pyspark (Spark 2.2) для выполнения операций фильтрации и агрегации с очень широким набором функций (~ 13 миллионов строк, 15 000 столбцов).Набор функций хранится в виде паркетных файлов на диске S3.Я запускаю тестовый скрипт, чтобы загрузить набор функций в кадре данных, выбрать несколько тысяч записей, сгруппировать по определенному коду региона и усреднить каждый из 15 тыс. Столбцов функций.Проблема заключается в том, что задание либо дает ошибку, либо занимает слишком много времени (около 8 часов для выборки из 5% записей).

Существуют ли способы ускорения операций такого типа на широком фрейме данных вPyspark?Я использую ноутбуки Jupyter и хотел бы, чтобы эти запросы выполнялись в считанные минуты, а не часы.

Вот мой код

df_feature_store = spark.read.parquet(PATH_FEATURE_STORE).sample(False, 0.05, seed=0).cache()
    logger.info("Initial data set loaded and sampled")

    df_selected_rors = spark.read.csv(PATH_DATA_SOURCE+"ROR Sample.csv", header=True)
    agg_cols = [x for x in df_feature_store.columns if re.search("^G\d{2}",x)]
    agg_cols = agg_cols[:10]  # just testing with fewer columns
    expr = {x:"mean" for x in agg_cols}
    joineddf = df_feature_store.join(df_selected_rors, df_feature_store.ROLLOUTREGION_IDENTIFIER == df_selected_rors.ROR, "inner")
    aggdf = joineddf.groupby("ROLLOUT_REGION_IDENTIFIER").agg(expr)
    # replace groupby
    # loop for a 1000 column aggregations 
    # transpose columns into rows as arrays
    aggdf.write.mode("overwrite").csv(PATH_FEATURE_STORE + "aggregated", header=True)
    logger.info("Done")`

1 Ответ

0 голосов
/ 22 октября 2018

Я бы попытался разделить это, чтобы увидеть, в чем проблема

  • В некоторых версиях Spark есть проблемы со многими, многими столбцами в DF;Я не могу вспомнить подробности.
  • читать из CSV и сохранять в Parquet локально, перед любыми запросами, фильтруя столбцы, если вы можете
  • запускать запросы Parquet local- to Parquet local

S3 в качестве места назначения работы (а) выполняет медленно и (b) рискует потерять данные из-за возможной согласованности S3.Если вы не используете EMRFS, совместимую с S3mper / S3Guard / EMR, вы не должны использовать ее как прямое назначение работы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...