ОБНОВЛЕННЫЙ ОТВЕТ:
Я нашел гораздо более простой способ сделать это, используя dbutils.fs.put
.Вам нужно было бы перебрать каждую строку вашего DataFrame, вызывая dbutils.fs.put () для каждой строки.
Предполагается, что ваш входной файл (предполагается, CSV) с двумя столбцами выглядит примерно так:
filepath, stringValue
wasbs://container@myaccount.blob.core.windows.net/demo1.txt,"demo string 1"
wasbs://container@myaccount.blob.core.windows.net/demo2.txt,"demo string 2"
wasbs://container@myaccount.blob.core.windows.net/demo3.txt,"demo string 3"
wasbs://container@myaccount.blob.core.windows.net/demo4.txt,"demo string 4"
wasbs://container@myaccount.blob.core.windows.net/demo5.txt,"demo string 5"
Вы можете использовать следующее для циклического прохождения каждой строки во входном кадре данных:
df = spark.read.option("header", True).csv("wasbs://container@myaccount.blob.core.windows.net/demo-data.csv")
rowList = df.rdd.collect()
for row in rowList:
dbutils.fs.put(str(row[0]), str(row[1]), True)
Метод put записывает данную строку в файл, закодированный в UTF-8, поэтому, используя это, вы можете перебирать каждую запись в вашем DataFrame, передавая первый столбец как путь к файлу, а второй какСодержимое строки для записи в файл.
Преимущество также заключается в записи строки в один файл, поэтому вам не нужно проходить процесс переименования и перемещения файлов.
СТАРЫЙ ОТВЕТ:
Из-за распределенной природы Spark запись DataFrame в файлы приводит к созданию каталога, который будет содержать несколько файлов.Вы можете использовать coalesce
для принудительной обработки одного работника и файла, имя которого начинается с part-0000
.
ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: Рекомендуется только для небольших файлов, так как большие файлы данных могут привести к исключениям нехватки памяти.
Чтобы выполнить то, что вы пытаетесь, вам нужно будет пройти по каждой строкеваш DataFrame, создавая новый DataFrame для каждой строки, который содержит только строковое значение, которое вы хотите записать в файл.
Предположим, что ваш входной файл (предположительно CSV) с двумя столбцами выглядит примерно так:
filepath, stringValue
wasbs://container@myaccount.blob.core.windows.net/demo1,"demo string 1"
wasbs://container@myaccount.blob.core.windows.net/demo2,"demo string 2"
wasbs://container@myaccount.blob.core.windows.net/demo3,"demo string 3"
wasbs://container@myaccount.blob.core.windows.net/demo4,"demo string 4"
wasbs://container@myaccount.blob.core.windows.net/demo5,"demo string 5"
Вы можете использовать следующее для циклического прохождения каждой строки во входном кадре данных:
from pyspark.sql import *
from pyspark.sql.types import StringType
df = spark.read.option("header", True).csv("wasbs://container@myaccount.blob.core.windows.net/demo-data.csv")
rowList = df.rdd.collect()
for row in rowList:
dfRow = spark.createDataFrame([str(row[1])], StringType())
dfRow.coalesce(1).write.mode("overwrite").text(row[0])
Это приведет к созданию каталогов в контейнере учетной записи Blob Storage с именами demo1, demo2, demo3, demo4 и demo5.Каждый из них будет содержать несколько файлов.Файл в каждом каталоге, имя которого начинается с part-0000
, является файлом, который будет содержать ваше строковое значение.
Если вам нужно, чтобы эти файлы имели разные имена и находились в другом месте, вы можете использовать dbutils.fs
методы для перемещения файлов и переименования.Вы также можете использовать эту функцию для очистки любых созданных каталогов, если это необходимо.