Я использую spark для чтения нескольких маленьких файлов. Каждый файл имеет специфический для клиента формат и содержит несколько таблиц (каждая из которых имеет свою структуру). Мы создали анализатор python, который работает и обрабатывает разбиение по заданному пути. Позвольте мне объяснить с помощью схемы:
folder
|- file_number=0001
|- year=2019
|- month=10
|- day=21
|- hour=17
|- file.txt
|- file_number=0002
|- year=2019
|- month=10
|- day=21
|- hour=17
|- file.txt
etc
.
.
.
Итак, наивный подход таков:
sc.wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt')\ # This is a pair (path, file Content)
.flatMap(lambda x: parser(x[1], x[0]))\ # This is the parser function. Is plain python and works fast. We use the path to pick up the partitioning. The parser returns a list of tuples that's why flatMap
.foldByKey([], lambda x, y: x + y) # The key is the table name and the value is the data as a list of dicts in a tabular-like style
Преобразование .wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt')
занимает безумное количество времени, учитывая, что остальное занимает не так много,
До этого блога, проблема может быть в каком-то рекурсивном вызове, поэтому гораздо лучше сначала перечислить все файлы, а затем прочитать каждый файл. Я не могу использовать FileSystem.open(path)
Hadoop, как предлагается в ссылке, потому что я работаю над Azure Data Lake Gen2. Но это правда, что перечисление всех файлов с использованием dbutlis.fs
выполняется быстро.
Итак, вопрос: Как я могу использовать такой список для параллельного чтения и анализа файлов? . Проблема в том, что wholeTextFile
не принимает список в качестве аргумента. Я пробовал все следующее:
list_of_all_files_paths = dbutils.someCode()
# Attempt 1: Type mismatch
rdd = sc.wholeTextFile(list_of_all_files_paths)
# Attempt 2: Works but all partitiong info is lost
rdd = spark.read.text(list_of_all_files_paths, wholetext=True)
# Attempt 3: Takes a lot of time too
rdd = spark.read.text('path/to/')
# Attempt 3: This is the most succesfull approach... but looks sooo bad, and is not very fast neither...
rdd = sc.emptyRDD()
for path in list_of_all_files_paths:
newRDD = sc.wholeTextFiles(path)
rdd = rdd.union(newRDD)