Высокий разлив памяти в случайном порядке (+ 100 ГБ), когда чтение в случайном порядке составляет 5,0 ГБ - PullRequest
0 голосов
/ 07 февраля 2020

Что касается контекста iself, я пытаюсь загрузить данные .parquet из S3 в кластер Spark с 1 ядром драйвера (2 ГБ памяти), 4 исполнителями x 2 ядра (2 ГБ памяти) и по Speci c Для каждой итерации обработка выполняется слишком медленно, поэтому для ее завершения требуется + 24 часа (подробнее об этом ниже).

Процесс идет следующим образом (псевдокод).

s3Data
  .groupBy(
      key1,
      key2,
      ...,
      key9
  )
  .agg(
      // several counts, nothing specially intensive computationally.
  )
  .cache

Этот DataFrame содержит сгруппированные данные, которые впоследствии будут отфильтрованы по одному из тех ключей , которые ранее использовались в groupBy.

Эта фильтрация происходит здесь.

for (df <- getFilteredDFs(groupedDF)) {
    // DB insertion takes places here, with df.foreach each of the Rows are inserted.
}

Это диаграмма для некоторых этапов.

Spark UI

Начиная снизу, первые три этапа - кэш (0), собрать (1) и собрать (2). Остальные элементы являются foreach с ранее упомянутыми логами c.

Однако, точка имеет определенное значение для ключа (который содержит много данных), что приводит к следующему foreach.

Spark UI for a given foreach

Почему огромный разлив памяти? Это вызвано перекосом данных? И что более важно, есть ли способ это исправить?

...