Так что вопрос в теме. Я думаю, что я не правильно понимаю работу передела. По моему мнению, когда я говорю somedataset.repartition(600)
, я ожидаю, что все данные будут распределены по рабочим группам одинакового размера (скажем, 60 рабочих).
Так, например. Я хотел бы иметь большой кусок данных для загрузки в несбалансированные файлы, скажем, 400 файлов, где 20% имеют размер 2 ГБ, а другие 80% около 1 МБ. У меня есть код для загрузки этих данных:
val source = sparkSession.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("delimiter","\t")
.load(mypath)
Чем я хочу преобразовать необработанные данные в мой промежуточный объект, отфильтровать нерелевантные записи, преобразовать в конечный объект (с дополнительными атрибутами), а затем разделить на несколько столбцов и записать в паркет. На мой взгляд, разумно сбалансировать данные (40000 разделов) между работниками, а затем выполнить такую работу:
val ds: Dataset[FinalObject] = source.repartition(600)
.map(parse)
.filter(filter.IsValid(_))
.map(convert)
.persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")
ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns:_*)
.format("parquet")
.mode(SaveMode.Append)
.save(destUrl)
Но это не так с
ExecutorLostFailure (выход executor 7 вызван одним из запущенных
Задачи) Причина: контейнер, убитый YARN за превышение пределов памяти.
34,6 ГБ из 34,3 ГБ физической памяти. Рассмотрите возможность повышения spark.yarn.executor.memoryOverhead.
Когда я не делаю перераспределение, все в порядке. Где я не правильно понимаю передел?