Почему Featuretools замедляется, когда я увеличиваю количество работников Dask? - PullRequest
0 голосов
/ 07 марта 2020

Я пользуюсь ноутбуком Amazon SageMaker с 72 ядрами и 144 ГБ оперативной памяти, и я провел 2 теста с выборкой полных данных, чтобы проверить, работает ли кластер Dask.

Пример имеет 4500 строк и 735 столбцов из 5 разных «активов» (я имею в виду 147 столбцов для каждого актива). Код фильтрует столбцы и создает матрицу объектов для каждого отфильтрованного кадра данных.

Сначала я инициализировал кластер следующим образом, получил 72 рабочих и 17 минут работы. (Я предполагаю, что я создал 72 рабочих с одним ядром в каждом.)

    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)

    def main():
      import featuretools as ft
      list_columns = list(df_concat_02.columns)

      list_df_features=[]
      from tqdm.notebook import tqdm

      for asset in tqdm(list_columns,total=len(list_columns)):
        dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()

        es = ft.EntitySet()  
        es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe, 
                                      index = 'index', 
                                      time_index = 'Date')
        fm, features = ft.dfs(entityset=es, 
                              target_entity='MARKET',
                              trans_primitives = ['divide_numeric'],
                              agg_primitives = [],
                              max_depth=1,
                              verbose=True,
                              dask_kwargs={'cluster': client.scheduler.address}

                              )
        list_df_features.append(fm)
      return list_df_features

    if __name__ == "__main__":
        list_df = main()

Во-вторых, я инициализировал кластер следующим образом, я получил 9 рабочих и получил 3,5 минуты работы. (Я предполагаю, что я создал 9 рабочих с 8 ядрами в каждом.)

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)

def main():
  import featuretools as ft
  list_columns = list(df_concat_02.columns)

  list_df_features=[]
  from tqdm.notebook import tqdm

  for asset in tqdm(list_columns,total=len(list_columns)):
    dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()

    es = ft.EntitySet()  
    es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe, 
                                  index = 'index', 
                                  time_index = 'Date')
    fm, features = ft.dfs(entityset=es, 
                          target_entity='MARKET',
                          trans_primitives = ['divide_numeric'],
                          agg_primitives = [],
                          max_depth=1,
                          verbose=True,
                          dask_kwargs={'cluster': client.scheduler.address}

                          )
    list_df_features.append(fm)
  return list_df_features

if __name__ == "__main__":
    list_df = main()

Для меня это сногсшибательно, потому что я думал, что 72 рабочих могут выполнять работу быстрее! Когда-то я не специалист ни по Dask, ни по FeatureTools, наверное, я что-то не так настраиваю.

Буду признателен за любую помощь и совет!

Спасибо!

1 Ответ

1 голос
/ 10 марта 2020

Вы правильно устанавливаете dask_kwargs в DFS. Я думаю, что замедление происходит из-за дополнительных издержек и меньшего количества ядер у каждого работника. Чем больше работников, тем больше накладных расходов при передаче данных. Кроме того, можно использовать 8 ядер от 1 рабочего, чтобы вычисления выполнялись быстрее, чем 1 ядро ​​из 8 рабочих.

...