Измените спарк _временный путь к каталогу, чтобы избежать удаления паркетов - PullRequest
0 голосов
/ 19 марта 2020

Если два или более заданий Spark имеют одинаковый выходной каталог, взаимное удаление файлов будет неизбежным.

Я пишу фрейм данных в режиме добавления с помощью spark 2.4.4 и хочу добавить временную метку в каталог tmp spark, чтобы избежать их удаления.

пример:

my JobSpark записать в hdfs:/outputFile/0/tmp/file1.parquet

то же самое искровое задание, вызванное другими данными, и записать в hdfs:/outputFil/0/tm/file2.parquet

Я хочу jobSpark1 записать в hdfs:/outputFile/0/tmp+(timeStamp)/file1.parquet, а другую работу записать в hdfs:/outputFile/0/tmp+(timeStamp)/file2.parquet и затем переместить паркет в hdfs: / outputFile /

1 Ответ

0 голосов
/ 19 марта 2020
df
  .write
        .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
        .partitionBy("XXXXXXXX")
        .mode(SaveMode.Append)
        .format(fileFormat)
        .save(path)

Когда Spark добавляет данные в существующий набор данных, Spark использует FileOutputCommitter для управления промежуточными выходными файлами и конечными выходными файлами. Поведение FileOutputCommitter напрямую влияет на производительность заданий, которые записывают данные.

FileOutputCommitter имеет два метода commitTask и commitJob. Apache Spark 2.0 и более поздние версии используют Apache Had oop 2, который использует значение mapreduce.fileoutputcommitter.algorithm.version для управления работой commitTask и commitJob. В Had oop 2 значение mapreduce.fileoutputcommitter.algorithm.version по умолчанию равно 1. Для этой версии commitTask перемещает данные, сгенерированные задачей, из временного каталога задачи во временный каталог задания, а когда все задачи завершены, commitJob перемещает данные из рабочего временного каталога в конечный пункт назначения.

Поскольку драйвер выполняет работу commitJob, для облачного хранилища эта операция может занять много времени. Вы можете часто думать, что ваша камера «висит». Тем не менее, когда значение mapreduce.fileoutputcommitter.algorithm.version равно 2, commitTask перемещает данные, сгенерированные задачей, непосредственно в конечный пункт назначения, а commitJob в основном не используется.

...