Dask: эффективный способ памяти, объединяющий различные фрагменты данных и объединяющий их вместе - PullRequest
0 голосов
/ 05 января 2019

У меня есть фрейм данных Dask (df) с историями заказов для клиентов, которые я прочитал из файла CSV.

За исключением:

| customer_id | order_date | order_number |   sales  |
|-------------|------------|--------------|----------|
|  109230900  |  20190104  |   order101   |  210.50  |

Сначала я получаю общую сумму продаж на одного клиента. Далее я хочу получить агрегаты продаж за различные интервалы дат: продажи за последние 7 дней, последние 14 дней и последние 30 дней. Сегодняшняя дата 20190104 (ГГГГММДД).

Вот результат, который я хочу:

| customer_id | total_sales | sales_for_past_7_days |   sales_for_past_14_days |   sales_for_past_30_days |
|-------------|-------------|-----------------------|--------------------------|--------------------------|
|  109230900  |   5105.10   |         210.50        |           320.00         |           1045.05        |

Моя попытка:

import datetime as dt
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df = dd.read_csv('order_history.csv', blocksize = 10e6)

total_sales = df.groupby('customer_id').agg({'sales':'sum'}, split_out=100).rename(columns={'sales':'total_sales'})`

total_sales = client.persist(total_sales)

end_date = dt.datetime.strptime('20190104','%Y%m%d') 

for interval in [7,14,30]:
    start_date = int(end_date - dt.timedelta(days=interval )).strftime('%Y%m%d'))
    newcol = 'sales_for_past_{}_days'.format(interval)

    tempdf = df[df.order_date > start_date].groupby('CUSTOMER_ID')

    tempagg = tempdf.agg({'sales':'sum'}, split_out=100).rename(columns={'sales':newcol})

    total_sales = dd.merge(total_sales, tempagg, how='left', left_index=True, right_index=True)

total_sales = client.persist(total_sales)

print(total_sales.head())

Есть ли более разумный способ сделать это? При выполнении кода выше, я получаю множество предупреждений о проблемах с памятью и о слишком большом количестве задач. Похоже, что около 2 миллионов заданий.

Все это работает как кластер с одним компьютером на Linux-машине с 16 ГБ ОЗУ и 16 ядрами. Код запускается как скрипт Python, а не в Jupyter Notebook.

python==3.7.0
dask==1.0.0
distributed=1.25.1
tornado==5.1
...