У меня есть фрейм данных Dask (df
) с историями заказов для клиентов, которые я прочитал из файла CSV.
За исключением:
| customer_id | order_date | order_number | sales |
|-------------|------------|--------------|----------|
| 109230900 | 20190104 | order101 | 210.50 |
Сначала я получаю общую сумму продаж на одного клиента. Далее я хочу получить агрегаты продаж за различные интервалы дат: продажи за последние 7 дней, последние 14 дней и последние 30 дней. Сегодняшняя дата 20190104
(ГГГГММДД).
Вот результат, который я хочу:
| customer_id | total_sales | sales_for_past_7_days | sales_for_past_14_days | sales_for_past_30_days |
|-------------|-------------|-----------------------|--------------------------|--------------------------|
| 109230900 | 5105.10 | 210.50 | 320.00 | 1045.05 |
Моя попытка:
import datetime as dt
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df = dd.read_csv('order_history.csv', blocksize = 10e6)
total_sales = df.groupby('customer_id').agg({'sales':'sum'}, split_out=100).rename(columns={'sales':'total_sales'})`
total_sales = client.persist(total_sales)
end_date = dt.datetime.strptime('20190104','%Y%m%d')
for interval in [7,14,30]:
start_date = int(end_date - dt.timedelta(days=interval )).strftime('%Y%m%d'))
newcol = 'sales_for_past_{}_days'.format(interval)
tempdf = df[df.order_date > start_date].groupby('CUSTOMER_ID')
tempagg = tempdf.agg({'sales':'sum'}, split_out=100).rename(columns={'sales':newcol})
total_sales = dd.merge(total_sales, tempagg, how='left', left_index=True, right_index=True)
total_sales = client.persist(total_sales)
print(total_sales.head())
Есть ли более разумный способ сделать это? При выполнении кода выше, я получаю множество предупреждений о проблемах с памятью и о слишком большом количестве задач. Похоже, что около 2 миллионов заданий.
Все это работает как кластер с одним компьютером на Linux-машине с 16 ГБ ОЗУ и 16 ядрами. Код запускается как скрипт Python, а не в Jupyter Notebook.
python==3.7.0
dask==1.0.0
distributed=1.25.1
tornado==5.1