Spark Dataframe Write to CSV создает _временный файл каталога в режиме автономного кластера - PullRequest
0 голосов
/ 30 августа 2018

Я запускаю spark job в кластере, который имеет 2 рабочих узла! Я использую код ниже (искра Java) для сохранения вычисленного фрейма данных в виде CSV на рабочих узлах.

dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath); Я пытаюсь понять, как spark записывает несколько файлов деталей на каждом рабочем узле.

Run1) worker1 имеет part files и SUCCESS; worker2 имеет _temporarty/task*/part* для каждой задачи выполняются файлы детали.

Run2) worker1 содержит файлы деталей, а также каталог _temporary; worker2 имеет multiple part files

Может кто-нибудь помочь мне понять, почему это поведение? 1) Должен ли я рассматривать записи в outputDir/_temporary как часть выходного файла вместе с part files in outputDir?

2) Должен ли _temporary dir быть удален после запуска задания и переместить файлы part в outputDir?

3) почему он не может создавать файлы деталей непосредственно под выходным каталогом?

coalesce(1) и repartition(1) не могут быть опцией, так как сам файл outputDir будет около 500GB

Spark 2.0.2. 2.1.3 и Java 8, no HDFS

Ответы [ 3 ]

0 голосов
/ 10 сентября 2018

TL; DR Для правильной записи (или чтения в этом отношении) данных с использованием источника на основе файловой системы вам потребуется общее хранилище.

Каталог

_temporary является частью базового механизма фиксации, используемого Spark - данные сначала записываются во временный каталог, а после завершения всей задачи атомарно перемещаются в конечный пункт назначения. Вы можете прочитать больше об этом процессе в Spark _ Причина временного создания

Чтобы этот процесс был успешным, вам нужна общая файловая система (HDFS, NFS и т. Д.) Или эквивалентное распределенное хранилище (например, S3). Поскольку у вас его нет, ожидается сбой очистки временного состояния - Сохранение кадра данных в локальной файловой системе приводит к пустым результатам .

Поведение, которое вы наблюдали (данные частично зафиксированы, а частично нет), может возникать, когда некоторые исполнители размещаются вместе с драйвером и совместно используют файловую систему с драйвером, обеспечивая полную фиксацию для подмножества данных.

0 голосов
/ 15 сентября 2018

После анализа заметил, что моя искровая работа использует fileoutputcommitter version 1, что по умолчанию. Затем я включил конфигурацию для использования fileoutputcommitter version 2 вместо version 1 и проверил в 10 узлах искровой автономный кластер в AWS. Все part-* files генерируются непосредственно под outputDirPath, указанным в dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)

Мы можем установить свойство

  1. Включая то же самое, что и --conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2' в spark-submit command

  2. или задайте свойство с помощью sparkContext javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")

Я понимаю последствия в случае сбоев, описанные в spark docs , но я достиг желаемого результата!

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version , значение по умолчанию равно 1
версия алгоритма коммиттера вывода файла, верная версия алгоритма номер: 1 или 2. Версия 2 может иметь лучшую производительность, но версия 1 может лучше справляться со сбоями в определенных ситуациях MapReduce-4815.

0 голосов
/ 30 августа 2018

Несколько файлов деталей основаны на вашем разделе данных. Количество записанных файлов или данных зависит от количества разделов, которые имеется в DataFrame на момент записи данных. По умолчанию на каждый раздел данных записывается один файл.

вы можете управлять им, используя коалесцию или передел. Вы можете уменьшить раздел или увеличить его.

если вы сделаете объединение 1, вы не увидите в нем несколько файлов деталей, но это повлияет на запись данных в параллель.

[outputDirPath = /tmp/multiple.csv]

dataframe
 .coalesce(1)
 .write.option("header","false")
 .mode(SaveMode.Overwrite)
 .csv(outputDirPath);

на ваш вопрос о том, как на него ссылаться ..

см. /tmp/multiple.csv для всех нижеуказанных частей.

/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...