Короче говоря, запись нескольких выходных файлов из одной задачи не распараллеливается, но при условии, что у вас есть несколько задач (несколько входных разбиений), каждая из них получит свое ядро на рабочем.
Целью записи разделенных данных не является распараллеливание вашей операции записи. Spark уже делает это, одновременно выписывая несколько задач одновременно. Цель состоит в том, чтобы просто оптимизировать будущие операции чтения, для которых требуется только один раздел сохраненных данных.
Логика записи разделов в Spark предназначена для чтения всех записей предыдущего этапа только один раз при записи их в место назначения. Я считаю, что частью выбора дизайна также является защита от случая, когда
ключ раздела имеет много значений.
РЕДАКТИРОВАТЬ: метод Spark 2.x
В Spark 2.x он сортирует записи в каждой задаче по ключам секционирования, а затем перебирает их, записывая в один дескриптор файла за раз. Я предполагаю, что они делают это, чтобы гарантировать, что они никогда не откроют огромное количество файловых дескрипторов, если в ключах вашего раздела много разных значений.
Для справки, вот сортировка:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121
Прокрутите немного вниз, и вы увидите, что он вызывает write(iter.next())
, проходящий по каждой строке.
А вот фактическая запись (один ключ файла / раздела за раз):
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121
Там вы можете видеть, что за один раз открыт только один дескриптор файла.
РЕДАКТИРОВАТЬ: метод Spark 1.x
Что делает spark 1.x для данной задачи, так это циклически перебирает все записи, открывая новый дескриптор файла, когда сталкивается с новым выходным разделом, которого не видел ранее для этой задачи. Затем он немедленно записывает запись в этот дескриптор файла и переходит к следующему. Это означает, что в любой момент времени при обработке одной задачи может быть открыто до N файловых дескрипторов только для этой одной задачи, где N - максимальное количество выходных разделов. Чтобы прояснить ситуацию, вот некоторый псевдо-код на python, чтобы показать общую идею:
# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
partition_path = determine_output_path(row, partition_keys)
if partition_path not in handles:
handles[partition_path] = open(partition_path, 'w')
handles[partition_path].write(row)
Существует предостережение в отношении вышеуказанной стратегии записи записей. В spark 1.x настройка spark.sql.sources.maxConcurrentWrites
устанавливает верхний предел для дескрипторов файла маски, который может быть открыт для каждой задачи. После этого Spark вместо этого сортирует данные по ключу раздела, чтобы можно было перебирать записи по одному файлу за раз.