Я пользуюсь ноутбуком Amazon SageMaker с 72 ядрами и 144 ГБ оперативной памяти, и я провел 2 теста с выборкой полных данных, чтобы проверить, работает ли кластер Dask.
Пример имеет 4500 строк и 735 столбцов из 5 разных «активов» (я имею в виду 147 столбцов для каждого актива). Код фильтрует столбцы и создает матрицу объектов для каждого отфильтрованного кадра данных.
Сначала я инициализировал кластер следующим образом, получил 72 рабочих и 17 минут работы. (Я предполагаю, что я создал 72 рабочих с одним ядром в каждом.)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
Во-вторых, я инициализировал кластер следующим образом, я получил 9 рабочих и получил 3,5 минуты работы. (Я предполагаю, что я создал 9 рабочих с 8 ядрами в каждом.)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
Для меня это сногсшибательно, потому что я думал, что 72 рабочих могут выполнять работу быстрее! Когда-то я не специалист ни по Dask, ни по FeatureTools, наверное, я что-то не так настраиваю.
Буду признателен за любую помощь и совет!
Спасибо!