Я новичок в разжигании и не имею опыта программирования на Java. Я использую pyspark для обработки очень большого набора данных временных рядов с почти 4000 числовых (плавающих) столбцов и миллиардов строк.
Чего я хочу добиться с помощью этого набора данных:
Данные временного ряда имеют интервалы 10 мс. Я хочу сгруппировать данные по интервалам в 1 с и использовать среднее значение в качестве функции агрегирования.
Вот код, который я использую для чтения разделенных файлов паркета.
df = (spark.read.option("mergeSchema", "true")
.parquet("/data/"))
Вот фрагмент кода для группирования и агрегации, который я написал:
col_list = [... list of numeric columns in the dataframe ...]
agg_funcs = [mean] # I also want to add other aggregation functions here later.
exprs = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]
result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
.agg(*exprs))
Теперь я хочу записать приведенный выше кадр данных результата в разделенный паркет:
(result.write.mode("overwrite")
.partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
.parquet('/out/'))
Но я получаю Java-кучу из-за ошибки памяти.
Я попытался увеличить spark.sql.shuffle.partitions
, чтобы каждый раздел был меньшего размера, но это не помогло.
Моя конфигурация искрового кластера:
2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.
Конфигурация, которую я указываю для своей работы с искрой:
{
"driverMemory": "8G",
"driverCores": 4,
"executorMemory": "20G",
"executorCores": 4,
"numExecutors": 14,
"conf": {
"spark.sql.shuffle.partitions": 2000000
}
}
Ниже приведены некоторые скриншоты из Ambari относительно конфигурации кластера:
память YARN
процессор YARN
Может кто-нибудь помочь мне понять, почему существует проблема с памятью и какИсправить это? Спасибо.