Переименование искрового выхода CSV в хранилище Azure BLOB - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть записная книжка Databricks, которая работает следующим образом:

  • Сведения о подключении pyspark к учетной записи хранения BLOB-объектов
  • Чтение файла через фрейм данных spark
  • преобразование вpandas Df
  • моделирование данных на pandas Df
  • преобразование в искровую Df
  • запись в хранилище больших двоичных объектов в одном файле

Моя проблема в том, чтоВы не можете назвать файл выходного файла, где мне нужно статическое имя файла CSV.

Есть ли способ переименовать это в pyspark?

## Blob Storage account information
storage_account_name = ""
storage_account_access_key = ""

## File location and File type
file_location = "path/.blob.core.windows.net/Databricks_Files/input"
file_location_new = "path/.blob.core.windows.net/Databricks_Files/out"
file_type = "csv"

## Connection string to connect to blob storage
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

С последующим выводом файла после преобразования данных

dfspark.coalesce(1).write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").save(file_location_new)

Где файл затем записывается как "part-00000-tid-336943946930983 ..... csv"

Где целью является "Output.csv"

ДругойПодход, который я рассмотрел, просто воссоздает это в python, но еще не встречал в документации о том, как вывести файл обратно в хранилище BLOB-объектов.

Я знаю, что метод извлечения из хранилища BLOB-объектов - .get_blob_to_path через microsoft.docs

Любая помощь здесь очень ценится.

1 Ответ

0 голосов
/ 13 декабря 2018

Hadoop / Spark будет параллельно выводить результат вычисления для каждого раздела в один файл, поэтому вы увидите много файлов part-<number>-.... в пути вывода HDFS, например, Output/, названный вами.

Если вы хотитевывести все результаты вычислений в один файл, вы можете объединить их с помощью команды hadoop fs -getmerge /output1/part* /output2/Output.csv или установить число процессов сокращения с помощью 1, например, используя функцию coalesce(1).

Итак, в вашем сценарии вынужно только настроить порядок вызова этих функций, чтобы функция coalease вызывалась в начале функции save, как показано ниже.

dfspark.write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").coalesce(1).save(file_location_new)
...