PySpark.Кэширование медленное, это limitByKey или это мое невежество? - PullRequest
0 голосов
/ 23 сентября 2019

Я получил тысячи файлов, которые нужно проанализировать с помощью определенного синтаксического анализатора клиента.Для каждого файла парсер генерирует словарь TableName -> pandas.DataFrame.Я следовал приведенному ниже подходу, который оказывается очень медленным.

def processDF(filePath: str, fileContent: str):
   """Parses the file as a string and generates a dictionary"""
   some code...

   return result # The type of result is [(tableName, pandas.DataFrame)]

# This RDD contains tuples of the form (TableName, pd.DataFrame)
rdd = sc. \
  wholeTextFiles('/path/to/file/id=*/year=*/month=*/day=*/hour=*/*'). \ # A pair (Path, Content)
  filter(lambda x: 'Error' not in x[0]). \                              # Just filter when path contains Error
  flatMap(lambda x: processDF(x[0], x[1])). \                           # For each x, generates a list of pairs (TableName, pd.DataFrame), hence flatmap
  reduceByKey(lambda a, b: pd.concat([a,b], ignore_index=True)). \      # Concat all files
  cache()                                                               # Cache the result.

Выполнение строки выше занимает более 20 минут, поэтому я предполагаю, что либо reduceByKey, либо cache действительно выполняют некоторыеДействие, если нет, является полностью сумасшедшим, такое количество времени для ленивого не оцениваемого RDD

Проблемы возникают при записи каждого DataFrame как паркета в файловой системе Azure:

# structDict is a dictionary TableName -> StructType
for k, struct in structDict.keys():
  df = rdd.filter(lambda x: x[0] == k).map(lambda x: x[1]).toDF(struct)
  writeAsParquet(df, k, struct) # just like df.write.parquet but with some business logic

Thisзанять безумное количество времени, что не представляется разумным, так как сложная часть вычислений кэшировалась ранее, и для каждого ключа размер dataFrame не так уж велик ... около 1,5 миллионов строк вхудший случай

Очевидно, что rdd, похоже, вообще не кэшируется, но тогда почему это занимает так много времени?Что мне здесь не хватает?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...