Как выполнить итерацию в Databricks для чтения сотен файлов, хранящихся в разных подкаталогах в Data Lake? - PullRequest
0 голосов
/ 17 июня 2020

Мне нужно прочитать сотни файлов avro в Databricks из Azure Data Lake Gen2, извлечь данные из поля Body внутри каждого файла и объединить все извлеченные данные в уникальный фрейм данных. Дело в том, что все файлы avro для чтения хранятся в разных подкаталогах в озере, следуя шаблону:

root / YYYY / MM / DD / HH / mm / ss.avro

Это вынуждает меня oop принимать и выбирать данные. Я использую этот код Python, в котором list_avro_files - это список путей ко всем файлам:

list_data = []

for file_avro in list_avro_files:
  df = spark.read.format('avro').load(file_avro)
  data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
  list_data.append(data1)

data = reduce(DataFrame.unionAll, list_data)

Есть ли способ сделать это более эффективно? Как я могу распараллелить / ускорить этот процесс?

1 Ответ

3 голосов
/ 17 июня 2020

Пока ваш list_avro_files может быть выражен с помощью стандартного синтаксиса подстановочных знаков, вы, вероятно, можете использовать собственную способность Spark распараллеливать операции чтения. Все, что вам нужно, это указать basepath и шаблон имени файла для ваших файлов avro:

scala> var df = spark.read
                 .option("basepath","/user/hive/warehouse/root")
                 .format("avro")
                 .load("/user/hive/warehouse/root/*/*/*/*.avro")

И, если вы обнаружите, что вам нужно точно знать, какой файл любая заданная строка взята из, используйте встроенную функцию input_file_name() для обогащения вашего фрейма данных:

scala> df = df.withColumn("source",input_file_name())
...