Эффективная передача дополнительной информации по группе в функцию Dask dataframe.groupby.apply - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть кадр данных Dask, который сгруппирован, а затем функция применяется к каждой группе. Эта функция использует некоторые предварительно рассчитанные метрики из другого фрейма данных как часть своей работы.

В реальном коде все данные находятся в наборах данных паркета, загруженных из S3 и запущенных в распределенном кластере Dask. Вот упрощенный пример с использованием CSV-файлов.

profiles.csv

company,stat1
1000,10
2000,20

catalog.csv

company,desc
1000,ABC
1000,def
2000,GHI
2000,jkl

код

from dask import dataframe as ddf

profiles_df = ddf.read_csv("profiles.csv").set_index("company")
catalog_df = ddf.read_csv("catalog.csv").set_index("company")

def refine(group_df):
    profile = profiles_df.loc[group_df.name].compute()
    group_df["desc_"] = group_df["desc"].apply(lambda t: f"{t}-{int(profile.stat1)}")
    return group_df

catalog_grouped_df = catalog_df.groupby("company")
refined_catalog_meta = catalog_df._meta.copy()
refined_catalog_meta["desc_"] = None
refined_catalog_df = catalog_grouped_df.apply(refine, meta=refined_catalog_meta)
refined_catalog_df.compute()

Это работает, за исключением того, что источник profiles_df csv / parquet читается снова и снова для каждого вызова refine(group_df). Как мне улучшить это, чтобы profiles_df читался один раз, и тогда только функция, соответствующая каждой группе, передается или доступна для функции refine?

Обновление

Мне удалось остановить многократные чтения из исходных наборов данных Parquet, прочитав файл profile_df и разбросав его. Примерно так:

from dask import dataframe as ddf
from dask.distributed import default_client

profiles_df = ddf.read_csv("profiles.csv").set_index("company")
catalog_df = ddf.read_csv("catalog.csv").set_index("company")

def refine(group_df):
    profile = profiles_df.loc[group_df.name].compute()
    group_df["desc_"] = group_df["desc"].apply(lambda t: f"{t}-{int(profile.stat1)}")
    return group_df

profiles_df = default_client().scatter(profiles_df.compute(), broadcast=True)
catalog_grouped_df = catalog_df.groupby("company")
refined_catalog_meta = catalog_df._meta.copy()
refined_catalog_meta["desc_"] = None
refined_catalog_df = catalog_grouped_df.apply(refine, meta=refined_catalog_meta)
refined_catalog_df.compute()
…

Основным недостатком является то, что profiles_df читается вызывающему клиенту, а затем отправляется в планировщик. Есть ли способ заставить планировщика или работника выполнять чтение и разбрасывание?

...