У меня есть кадр данных Dask, который сгруппирован, а затем функция применяется к каждой группе. Эта функция использует некоторые предварительно рассчитанные метрики из другого фрейма данных как часть своей работы.
В реальном коде все данные находятся в наборах данных паркета, загруженных из S3 и запущенных в распределенном кластере Dask. Вот упрощенный пример с использованием CSV-файлов.
profiles.csv
company,stat1
1000,10
2000,20
catalog.csv
company,desc
1000,ABC
1000,def
2000,GHI
2000,jkl
код
from dask import dataframe as ddf
profiles_df = ddf.read_csv("profiles.csv").set_index("company")
catalog_df = ddf.read_csv("catalog.csv").set_index("company")
def refine(group_df):
profile = profiles_df.loc[group_df.name].compute()
group_df["desc_"] = group_df["desc"].apply(lambda t: f"{t}-{int(profile.stat1)}")
return group_df
catalog_grouped_df = catalog_df.groupby("company")
refined_catalog_meta = catalog_df._meta.copy()
refined_catalog_meta["desc_"] = None
refined_catalog_df = catalog_grouped_df.apply(refine, meta=refined_catalog_meta)
refined_catalog_df.compute()
Это работает, за исключением того, что источник profiles_df
csv / parquet читается снова и снова для каждого вызова refine(group_df)
. Как мне улучшить это, чтобы profiles_df
читался один раз, и тогда только функция, соответствующая каждой группе, передается или доступна для функции refine
?
Обновление
Мне удалось остановить многократные чтения из исходных наборов данных Parquet, прочитав файл profile_df и разбросав его. Примерно так:
from dask import dataframe as ddf
from dask.distributed import default_client
profiles_df = ddf.read_csv("profiles.csv").set_index("company")
catalog_df = ddf.read_csv("catalog.csv").set_index("company")
def refine(group_df):
profile = profiles_df.loc[group_df.name].compute()
group_df["desc_"] = group_df["desc"].apply(lambda t: f"{t}-{int(profile.stat1)}")
return group_df
profiles_df = default_client().scatter(profiles_df.compute(), broadcast=True)
catalog_grouped_df = catalog_df.groupby("company")
refined_catalog_meta = catalog_df._meta.copy()
refined_catalog_meta["desc_"] = None
refined_catalog_df = catalog_grouped_df.apply(refine, meta=refined_catalog_meta)
refined_catalog_df.compute()
…
Основным недостатком является то, что profiles_df
читается вызывающему клиенту, а затем отправляется в планировщик. Есть ли способ заставить планировщика или работника выполнять чтение и разбрасывание?