Я получил тысячи файлов, которые нужно проанализировать с помощью определенного синтаксического анализатора клиента.Для каждого файла парсер генерирует словарь TableName -> pandas.DataFrame
.Я следовал приведенному ниже подходу, который оказывается очень медленным.
def processDF(filePath: str, fileContent: str):
"""Parses the file as a string and generates a dictionary"""
some code...
return result # The type of result is [(tableName, pandas.DataFrame)]
# This RDD contains tuples of the form (TableName, pd.DataFrame)
rdd = sc. \
wholeTextFiles('/path/to/file/id=*/year=*/month=*/day=*/hour=*/*'). \ # A pair (Path, Content)
filter(lambda x: 'Error' not in x[0]). \ # Just filter when path contains Error
flatMap(lambda x: processDF(x[0], x[1])). \ # For each x, generates a list of pairs (TableName, pd.DataFrame), hence flatmap
reduceByKey(lambda a, b: pd.concat([a,b], ignore_index=True)). \ # Concat all files
cache() # Cache the result.
Выполнение строки выше занимает более 20 минут, поэтому я предполагаю, что либо reduceByKey
, либо cache
действительно выполняют некоторыеДействие, если нет, является полностью сумасшедшим, такое количество времени для ленивого не оцениваемого RDD
Проблемы возникают при записи каждого DataFrame как паркета в файловой системе Azure:
# structDict is a dictionary TableName -> StructType
for k, struct in structDict.keys():
df = rdd.filter(lambda x: x[0] == k).map(lambda x: x[1]).toDF(struct)
writeAsParquet(df, k, struct) # just like df.write.parquet but with some business logic
Thisзанять безумное количество времени, что не представляется разумным, так как сложная часть вычислений кэшировалась ранее, и для каждого ключа размер dataFrame не так уж велик ... около 1,5 миллионов строк вхудший случай
Очевидно, что rdd
, похоже, вообще не кэшируется, но тогда почему это занимает так много времени?Что мне здесь не хватает?