Достигнуть параллелизма при сохранении в паркетный файл с разделами - PullRequest
0 голосов
/ 26 июня 2018

При записи dataframe в parquet с использованием partitionBy:

df.write.partitionBy("col1","col2","col3").parquet(path)

Я бы ожидал, что каждый записываемый раздел выполнялся независимо отдельной задачей и параллельно количеству рабочих, назначенных на текущее искровое задание.

Однако при записи в паркет фактически выполняется только один рабочий / задание. Этот рабочий просматривает каждый раздел и последовательно записывает файлы .parquet. Почему это так - и есть ли способ заставить параллелизм в этой spark.write.parquet операции?

Ниже указано , а не , что я хочу видеть (должно быть 700%+ ..)

enter image description here

Из этого другого поста я также попытался добавить repartition впереди

Spark паркетная перегородка: большое количество файлов

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)

К сожалению, это не имело никакого эффекта: все еще один рабочий ..

Примечание. Я работаю в режиме local с local[8] и видел, как другие операции искры выполняются с восемью одновременными рабочими и используют до 750% процессорных ресурсов.

1 Ответ

0 голосов
/ 27 июня 2018

Короче говоря, запись нескольких выходных файлов из одной задачи не распараллеливается, но при условии, что у вас есть несколько задач (несколько входных разбиений), каждая из них получит свое ядро ​​на рабочем.

Целью записи разделенных данных не является распараллеливание вашей операции записи. Spark уже делает это, одновременно выписывая несколько задач одновременно. Цель состоит в том, чтобы просто оптимизировать будущие операции чтения, для которых требуется только один раздел сохраненных данных.

Логика записи разделов в Spark предназначена для чтения всех записей предыдущего этапа только один раз при записи их в место назначения. Я считаю, что частью выбора дизайна также является защита от случая, когда ключ раздела имеет много значений.

РЕДАКТИРОВАТЬ: метод Spark 2.x

В Spark 2.x он сортирует записи в каждой задаче по ключам секционирования, а затем перебирает их, записывая в один дескриптор файла за раз. Я предполагаю, что они делают это, чтобы гарантировать, что они никогда не откроют огромное количество файловых дескрипторов, если в ключах вашего раздела много разных значений.

Для справки, вот сортировка:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Прокрутите немного вниз, и вы увидите, что он вызывает write(iter.next()), проходящий по каждой строке.

А вот фактическая запись (один ключ файла / раздела за раз):

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Там вы можете видеть, что за один раз открыт только один дескриптор файла.

РЕДАКТИРОВАТЬ: метод Spark 1.x

Что делает spark 1.x для данной задачи, так это циклически перебирает все записи, открывая новый дескриптор файла, когда сталкивается с новым выходным разделом, которого не видел ранее для этой задачи. Затем он немедленно записывает запись в этот дескриптор файла и переходит к следующему. Это означает, что в любой момент времени при обработке одной задачи может быть открыто до N файловых дескрипторов только для этой одной задачи, где N - максимальное количество выходных разделов. Чтобы прояснить ситуацию, вот некоторый псевдо-код на python, чтобы показать общую идею:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

    handles[partition_path].write(row)

Существует предостережение в отношении вышеуказанной стратегии записи записей. В spark 1.x настройка spark.sql.sources.maxConcurrentWrites устанавливает верхний предел для дескрипторов файла маски, который может быть открыт для каждой задачи. После этого Spark вместо этого сортирует данные по ключу раздела, чтобы можно было перебирать записи по одному файлу за раз.

...