Почему перераспределение Spark приводит к MemoryOverhead? - PullRequest
0 голосов
/ 28 июня 2018

Так что вопрос в теме. Я думаю, что я не правильно понимаю работу передела. По моему мнению, когда я говорю somedataset.repartition(600), я ожидаю, что все данные будут распределены по рабочим группам одинакового размера (скажем, 60 рабочих).

Так, например. Я хотел бы иметь большой кусок данных для загрузки в несбалансированные файлы, скажем, 400 файлов, где 20% имеют размер 2 ГБ, а другие 80% около 1 МБ. У меня есть код для загрузки этих данных:

val source = sparkSession.read.format("com.databricks.spark.csv")
  .option("header", "false")
  .option("delimiter","\t")
  .load(mypath)

Чем я хочу преобразовать необработанные данные в мой промежуточный объект, отфильтровать нерелевантные записи, преобразовать в конечный объект (с дополнительными атрибутами), а затем разделить на несколько столбцов и записать в паркет. На мой взгляд, разумно сбалансировать данные (40000 разделов) между работниками, а затем выполнить такую ​​работу:

val ds: Dataset[FinalObject] = source.repartition(600)
  .map(parse)
  .filter(filter.IsValid(_))
  .map(convert)
  .persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")

ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
  .write.partitionBy(partitionColumns:_*)
  .format("parquet")
  .mode(SaveMode.Append)
  .save(destUrl)

Но это не так с

ExecutorLostFailure (выход executor 7 вызван одним из запущенных Задачи) Причина: контейнер, убитый YARN за превышение пределов памяти. 34,6 ГБ из 34,3 ГБ физической памяти. Рассмотрите возможность повышения spark.yarn.executor.memoryOverhead.

Когда я не делаю перераспределение, все в порядке. Где я не правильно понимаю передел?

1 Ответ

0 голосов
/ 28 июня 2018

Ваша логика верна как для repartition, так и для partitionBy, но перед использованием repartition вы должны помнить об этом из нескольких источников.

Имейте в виду, что перераспределение ваших данных довольно дорого операция. Spark также имеет оптимизированную версию repartition (), которая называется coalesce (), которая позволяет избежать перемещения данных, но только если вы уменьшение количества разделов СДР.

Если вы хотите, чтобы ваша задача была выполнена, увеличьте объем памяти драйверов и исполнителей

...