У меня есть CSV, который я импортирую в блоки данных с помощью spark.read. Этот большой файл содержит записи / транзакции на ежедневном уровне. Я обрезаю фрейм данных до 5 столбцов и оставляю 500 000 строк как есть. Я пытаюсь создать сводную таблицу этого исходного файла, которая представляет эти записи / транзакции на уровне месяца (агрегат).
В сценарии есть команда filter / groupby / sum, которая возвращает одну строку, которая суммирует данные в счет на месяц. Строка, возвращаемая запросом, будет выглядеть следующим образом:
+---------+---------+-------+-------------+
| Country|StockCode|YYYY-MM|sum(Quantity)|
+---------+---------+-------+-------------+
|Singapore| M| 2011-4| 10|
+---------+---------+-------+-------------+
сценарий выполняет итерацию по исходному фрейму данных и возвращает каждый раз. У меня возникли проблемы с использованием вывода (отображения или экспорта в CSV) этого скрипта. И в pyspark, и в pandas у меня проблемы. Я не уверен, как сложить результат запроса и в какой форме он должен быть?
# Pandas Если я сделаю это в pandas, скрипту потребуется очень много времени для создания файла (я считаю pandas + я делаю это не так эффективно, увеличиваю продолжительность) ~ 2,5 часа. Команды display и write.csv работают довольно быстро и выполняются примерно за несколько секунд.
# Pyspark Если я сделаю это в pyspark, скрипт займет около 10 минут, но отображение и экспорт cra sh. Ноутбук либо возвращает ошибку тайм-аута, перезагружается, либо выдает ошибки cra sh.
Следует ли подходить к созданию списка списков динамически, а когда он будет полностью построен, преобразовать его в фрейм данных для использования? Я пробовал все способы, с которыми сталкивался, и, похоже, не добился никакого прогресса.
Вот код, который генерирует результаты
#officeSummaryDFBefore
column_names = "Country|StockCode|YYYY-MM|Quantity"
monthlyCountsBeforeImpactDate = spark.createDataFrame(
[
tuple('' for i in column_names.split("|"))
],
column_names.split("|")
).where("1=0")
monthlyCountsBeforeImpacteDateRow = spark.createDataFrame(
[
tuple('' for i in column_names.split("|"))
],
column_names.split("|")
).where("1=0")
try :
for country in country_lookup :
country = country[0]
print(country_count, " country(s) left")
country_count = country_count - 1
for stockCode in stockCode_lookup :
stockCode = stockCode[0]
monthlyCountsBeforeImpacteDateRow = dataBeforeImpactDate.filter((col("Country").rlike(country)) & (col("StockCode").rlike(stockCode))).groupby("Country", "StockCode", "YYYY-MM").sum()
monthlyCountsBeforeImpacteDateRow.show()
dfsCountsBefore = [monthlyCountsBeforeImpacteDateRow, monthlyCountsBeforeImpactDate]
monthlyCountsBeforeImpactDate = reduce(DataFrame.union, dfsCountsBefore)
except Exception as e:
print(e)
Я объявляю dfsCountsBeforeImpactDate внутри l oop что не кажется правильным, но когда он выходит за пределы, возвращается как NULL.