Ускоренный подход к поиску целых текстовых файлов для нескольких неформатированных файлов. PySpark - PullRequest
0 голосов
/ 21 октября 2019

Я использую spark для чтения нескольких маленьких файлов. Каждый файл имеет специфический для клиента формат и содержит несколько таблиц (каждая из которых имеет свою структуру). Мы создали анализатор python, который работает и обрабатывает разбиение по заданному пути. Позвольте мне объяснить с помощью схемы:

folder
|- file_number=0001
   |- year=2019
      |- month=10
         |- day=21
            |- hour=17
               |- file.txt
|- file_number=0002
   |- year=2019
      |- month=10
         |- day=21
            |- hour=17
               |- file.txt
etc
.
.
.

Итак, наивный подход таков:

sc.wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt')\ # This is a pair (path, file Content)
  .flatMap(lambda x: parser(x[1], x[0]))\ # This is the parser function. Is plain python and works fast. We use the path to pick up the partitioning. The parser returns a list of tuples that's why flatMap
  .foldByKey([], lambda x, y: x + y)      # The key is the table name and the value is the data as a list of dicts in a tabular-like style

Преобразование .wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt') занимает безумное количество времени, учитывая, что остальное занимает не так много,

До этого блога, проблема может быть в каком-то рекурсивном вызове, поэтому гораздо лучше сначала перечислить все файлы, а затем прочитать каждый файл. Я не могу использовать FileSystem.open(path) Hadoop, как предлагается в ссылке, потому что я работаю над Azure Data Lake Gen2. Но это правда, что перечисление всех файлов с использованием dbutlis.fs выполняется быстро.

Итак, вопрос: Как я могу использовать такой список для параллельного чтения и анализа файлов? . Проблема в том, что wholeTextFile не принимает список в качестве аргумента. Я пробовал все следующее:

list_of_all_files_paths = dbutils.someCode()

# Attempt 1: Type mismatch
rdd = sc.wholeTextFile(list_of_all_files_paths)

# Attempt 2: Works but all partitiong info is lost
rdd = spark.read.text(list_of_all_files_paths, wholetext=True)

# Attempt 3:  Takes a lot of time too
rdd = spark.read.text('path/to/')

# Attempt 3: This is the most succesfull approach... but looks sooo bad, and is not very fast neither...
rdd = sc.emptyRDD()
for path in list_of_all_files_paths:
  newRDD = sc.wholeTextFiles(path)
  rdd    = rdd.union(newRDD)

1 Ответ

1 голос
/ 21 октября 2019

Как @jxc ответил в комментариях. Решение очень простое:

rdd = sc.wholeTextFile(','.join(list_of_all_files_paths))

Оказывается, что строка, представляющая список путей, является допустимым вводом. Время все еще велико из-за суммы I/O, но по крайней мере часть списка стала очень маленькой

...