Максимальный размер сообщения на Azure Databricks - PullRequest
0 голосов
/ 06 февраля 2020

Я использую блоки данных с python на Azure для обработки моих данных. Результат этого процесса будет сохранен как CSV-файл на azure хранилище BLOB-объектов.

, но здесь есть проблема. когда размер файла результатов превышает 750 Мб, произошла ошибка.

после некоторого исследования в Google, я понял, что должен увеличить свой Scala .r c .message.maxSize, и я сделал это , проблема заключается в том, что максимальный размер, который я могу установить, составляет всего 2 ГБ, и, поскольку я использую блоки данных для анализа больших данных, я ожидаю, что файл будет намного больше 2 ГБ.

вопрос:

  1. означает, что 2 ГБ - это максимальный размер сообщения, поддерживаемый Azure Блоки данных? Я попытался выполнить поиск и go в официальном документе от Microsoft, но не могу найти какую-либо информацию по этому поводу.

  2. Есть ли способ повысить значение? или даже установить его в масштабируемое значение зависит от моих данных.

    1016 *

вот мой python код для этого процесса.

#mount azure storage to my databricks
dbutils.fs.mount(
  source = "wasbs://mystoragecontainer.blob.core.windows.net",
  mount_point = "/mnt/test3",
  extra_configs = {"fs.azure.account.key.mystoragecontainer.blob.core.windows.net":dbutils.secrets.get(scope = "myapps", key = "myappskey")})


#define saving process in a function
def save_data(df, savefile):
  df.coalesce(1).write.mode("overwrite").options(header="true").format("com.databricks.spark.csv").save(savefile)
  res = savefile.split('/')
  ls_target = savefile.rstrip(res[-1])
  dbutils.fs.ls(savefile+"/")
  fileList = dbutils.fs.ls(savefile+"/")
  target_name = ""
  for item in fileList:
    if item.name.endswith("csv"):
      filename= item.path
      target_parts = filename.split('/')
      target_name = filename.replace('/'+target_parts[-2]+'/', '/')
      print(target_name)
      dbutils.fs.mv(filename, ls_target)
    else:
      filename= item.path
      dbutils.fs.rm(filename, True)
  dbutils.fs.rm(savefile, True)
  dbutils.fs.mv(target_name, savefile)

# call my save function
save_data(df,"dbfs:/mnt/test3/myfolderpath/japanese2.csv")

любая информация будет оценена.

бест,

1 Ответ

1 голос
/ 07 февраля 2020

Если я правильно понимаю, вы хотите объединить распределенный CSV, сгенерированный следующим образом:

df.coalesce(1).write.mode("overwrite").options(header="true").format("com.databricks.spark.csv").save(savefile) 

Я бы посоветовал вам попытаться преобразовать его в pandas фрейм данных и записать в один CSV, как показано ниже:

# call my save function
df.toPandas().to_csv("/dbfs/mnt/test3/myfolderpath/japanese2.csv")

Это должно записать один CSV, содержащий все данные в вашем фрейме данных. Будьте осторожны при использовании /dbfs/ при использовании Pandas, так как он использует файловый API вместо API DBFS.

Кроме того, это pySpark, а не scala.

...