Как запустить Dask Client через звонок из другого скрипта? - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть обработка, которая выполняется в Luigi, на одном из этапов я выполняю серию вычислений в DataFrame. Для ускорения я решил использовать локальный кластер Dask. Когда я запускаю через Python или Jupyter, кластер идет вверх, и я все запускаю правильно, но когда он работает внутри Luigi, он выдает следующую ошибку:

UserWarning: не удалось запустить диагностический сервер на порту 8787.

df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  client = Client()
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df

Как это решить?

1 Ответ

0 голосов
/ 06 мая 2019

Это непроверенный код (нет опыта работы с luigi)
Как насчет следующего кода (в виде отдельного модуля) -

from dask.distributed import Client  
df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df 

if __name__ == "__main__":  
    with Client() as client:  
        df_result = func(params)
...