Что касается контекста iself, я пытаюсь загрузить данные .parquet из S3 в кластер Spark с 1 ядром драйвера (2 ГБ памяти), 4 исполнителями x 2 ядра (2 ГБ памяти) и по Speci c Для каждой итерации обработка выполняется слишком медленно, поэтому для ее завершения требуется + 24 часа (подробнее об этом ниже).
Процесс идет следующим образом (псевдокод).
s3Data
.groupBy(
key1,
key2,
...,
key9
)
.agg(
// several counts, nothing specially intensive computationally.
)
.cache
Этот DataFrame содержит сгруппированные данные, которые впоследствии будут отфильтрованы по одному из тех ключей , которые ранее использовались в groupBy.
Эта фильтрация происходит здесь.
for (df <- getFilteredDFs(groupedDF)) {
// DB insertion takes places here, with df.foreach each of the Rows are inserted.
}
Это диаграмма для некоторых этапов.
Начиная снизу, первые три этапа - кэш (0), собрать (1) и собрать (2). Остальные элементы являются foreach с ранее упомянутыми логами c.
Однако, точка имеет определенное значение для ключа (который содержит много данных), что приводит к следующему foreach.
Почему огромный разлив памяти? Это вызвано перекосом данных? И что более важно, есть ли способ это исправить?