spark - Java-куча нехватки памяти при выполнении групповой работы и агрегации на большом фрейме данных - PullRequest
0 голосов
/ 03 октября 2019

Я новичок в разжигании и не имею опыта программирования на Java. Я использую pyspark для обработки очень большого набора данных временных рядов с почти 4000 числовых (плавающих) столбцов и миллиардов строк.

Чего я хочу добиться с помощью этого набора данных:

Данные временного ряда имеют интервалы 10 мс. Я хочу сгруппировать данные по интервалам в 1 с и использовать среднее значение в качестве функции агрегирования.

Вот код, который я использую для чтения разделенных файлов паркета.

df = (spark.read.option("mergeSchema", "true")
           .parquet("/data/"))

Вот фрагмент кода для группирования и агрегации, который я написал:

col_list = [... list of numeric columns in the dataframe ...]

agg_funcs = [mean]   # I also want to add other aggregation functions here later.

exprs     = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]

result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
            .agg(*exprs))

Теперь я хочу записать приведенный выше кадр данных результата в разделенный паркет:

(result.write.mode("overwrite")
       .partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
       .parquet('/out/'))

Но я получаю Java-кучу из-за ошибки памяти.

Я попытался увеличить spark.sql.shuffle.partitions, чтобы каждый раздел был меньшего размера, но это не помогло.

Моя конфигурация искрового кластера:

2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.

Конфигурация, которую я указываю для своей работы с искрой:

{
    "driverMemory": "8G", 
    "driverCores": 4, 
    "executorMemory": "20G", 
    "executorCores": 4, 
    "numExecutors": 14, 
    "conf": {
        "spark.sql.shuffle.partitions": 2000000
    }
}

Ниже приведены некоторые скриншоты из Ambari относительно конфигурации кластера:

память YARN

процессор YARN

Может кто-нибудь помочь мне понять, почему существует проблема с памятью и какИсправить это? Спасибо.

Ответы [ 2 ]

0 голосов
/ 04 октября 2019

почему бы вам не объединить 'Year', 'Month', 'Day', 'Hour', 'Minute', 'Second', прежде чем делать groupBy. После groupBy вы можете воссоздать эти столбцы. Я думаю, что попробуйте без изменения executor-ядер, а затем уменьшить его до 15, а затем до 7. 4 слишком мало, я думаю

0 голосов
/ 04 октября 2019

Я полагаю, что это происходит из-за перекоса данных, и один из ваших разделов получает OOM.

Spark's groupBy () требует загрузки всех значений ключа одновременно в память, чтобы выполнить groupby.

Увеличение разделов не работает, потому что у вас могут быть большие данные с похожей группой по ключу. Проверьте, если у вас есть данные с аналогичной группой по ключу.

Проверьте эту статью, которая объясняет это лучше.

...