Метод динамического создания большого фрейма данных (искры или pandas) для экспорта в csv - PullRequest
0 голосов
/ 12 июля 2020

У меня есть CSV, который я импортирую в блоки данных с помощью spark.read. Этот большой файл содержит записи / транзакции на ежедневном уровне. Я обрезаю фрейм данных до 5 столбцов и оставляю 500 000 строк как есть. Я пытаюсь создать сводную таблицу этого исходного файла, которая представляет эти записи / транзакции на уровне месяца (агрегат).

В сценарии есть команда filter / groupby / sum, которая возвращает одну строку, которая суммирует данные в счет на месяц. Строка, возвращаемая запросом, будет выглядеть следующим образом:

+---------+---------+-------+-------------+
|  Country|StockCode|YYYY-MM|sum(Quantity)|
+---------+---------+-------+-------------+
|Singapore|        M| 2011-4|           10|
+---------+---------+-------+-------------+

сценарий выполняет итерацию по исходному фрейму данных и возвращает каждый раз. У меня возникли проблемы с использованием вывода (отображения или экспорта в CSV) этого скрипта. И в pyspark, и в pandas у меня проблемы. Я не уверен, как сложить результат запроса и в какой форме он должен быть?

# Pandas Если я сделаю это в pandas, скрипту потребуется очень много времени для создания файла (я считаю pandas + я делаю это не так эффективно, увеличиваю продолжительность) ~ 2,5 часа. Команды display и write.csv работают довольно быстро и выполняются примерно за несколько секунд.

# Pyspark Если я сделаю это в pyspark, скрипт займет около 10 минут, но отображение и экспорт cra sh. Ноутбук либо возвращает ошибку тайм-аута, перезагружается, либо выдает ошибки cra sh.

Следует ли подходить к созданию списка списков динамически, а когда он будет полностью построен, преобразовать его в фрейм данных для использования? Я пробовал все способы, с которыми сталкивался, и, похоже, не добился никакого прогресса.

Вот код, который генерирует результаты

#officeSummaryDFBefore
column_names = "Country|StockCode|YYYY-MM|Quantity"
monthlyCountsBeforeImpactDate = spark.createDataFrame(
  [
    tuple('' for i in column_names.split("|"))
  ],
  column_names.split("|")
).where("1=0")

monthlyCountsBeforeImpacteDateRow = spark.createDataFrame(
  [
    tuple('' for i in column_names.split("|"))
  ],
  column_names.split("|")
).where("1=0")

try :
  for country in country_lookup :
    country = country[0]
    print(country_count, " country(s) left")
    country_count = country_count - 1
    for stockCode in stockCode_lookup :
      stockCode = stockCode[0]
      monthlyCountsBeforeImpacteDateRow = dataBeforeImpactDate.filter((col("Country").rlike(country)) & (col("StockCode").rlike(stockCode))).groupby("Country", "StockCode", "YYYY-MM").sum()
      monthlyCountsBeforeImpacteDateRow.show()
      dfsCountsBefore = [monthlyCountsBeforeImpacteDateRow, monthlyCountsBeforeImpactDate]
      monthlyCountsBeforeImpactDate = reduce(DataFrame.union, dfsCountsBefore)
      
except Exception as e:
  print(e) 

Я объявляю dfsCountsBeforeImpactDate внутри l oop что не кажется правильным, но когда он выходит за пределы, возвращается как NULL.

1 Ответ

1 голос
/ 12 июля 2020

IIUC. Вы выполняете поиск по странам и запасам, чтобы ограничить строки, а затем группируете их для генерации агрегатов.

Почему бы не отфильтровать df полностью, а затем сгруппировать будет намного быстрее, поскольку вы не будете зацикливаться на фильтре, а также не нуждаетесь в объединении.

...