Как загрузить разные файлы в разные таблицы, основываясь на шаблоне файла? - PullRequest
0 голосов
/ 01 октября 2019

Я запускаю простой скрипт PySpark, как этот.

base_path = '/mnt/rawdata/'
file_names = ['2018/01/01/ABC1_20180101.gz',
               '2018/01/02/ABC2_20180102.gz',
               '2018/01/03/ABC3_20180103.gz',
               '2018/01/01/XYZ1_20180101.gz'
               '2018/01/02/XYZ1_20180102.gz']

for f in file_names:
  print(f)

Итак, просто протестировав это, я могу найти файлы и распечатать строки просто отлично. Теперь я пытаюсь выяснить, как загрузить содержимое каждого файла в определенную таблицу в SQL Server. Дело в том, что я хочу сделать поиск по шаблону для файлов, которые соответствуют шаблону, и загрузить определенные файлы в определенные таблицы. Итак, я хотел бы сделать следующее:

1) load all files with 'ABC' in the name, into my 'ABC_Table' and all files with 'XYZ' in the name, into my 'XYZ_Table' (all data starts on row 2, not row 1)
2) load the file name into a field named 'file_name' in each respective table (I'm totally fine with the entire string from 'file_names' or the part of the string after the last '/' character; doesn't matter)

Я пытался использовать для этого фабрику данных Azure, и она может рекурсивно циклически перебирать все файлы, но не загружает имена файлови мне действительно нужны имена файлов в таблице, чтобы различать, какие записи приходят, какие файлы и даты. Можно ли сделать это с помощью Azure Databricks? Я чувствую, что это выполнимый процесс ETL, но я недостаточно знаю об АБР, чтобы сделать эту работу. Я был бы очень признателен за любую / все помощь с этим.

ОБНОВЛЕНИЕ (на основе вашей рекомендации Даниэль):

dfCW = sc.sequenceFile('/mnt/rawdata/2018/01/01/ABC%.gz/').toDF()
dfCW.withColumn('input', input_file_name())
print(dfCW)

Дает мне:

com.databricks.backend.daemon.data.common.InvalidMountException:

Но .. Я чувствую, что это может быть близко. Мысли? Предложения?

1 Ответ

1 голос
/ 01 октября 2019

Вы можете использовать input_file_name из pyspark.sql.functions, например,

withFiles = df.withColumn("file", input_file_name())

После этого вы можете создать несколько фреймов данных, отфильтровав новый столбец

abc = withFiles.filter(col("file").like("%ABC%"))
xyz = withFiles.filter(col("file").like("%XYZ%"))

, а затем использовать обычную запись дляоба из них.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...