Ошибка использования Dask в кластере SGE || execute () получил неожиданный аргумент ключевого слова 'broadcast' - PullRequest
0 голосов
/ 25 февраля 2020

Я новичок в использовании Dask и Cluster и использовании следующего кода, чтобы убедиться, что я правильно использую / подключаю кластер. Теперь я получаю следующую ошибку. Сначала я использовал dask_drmaa для импорта SGECluster.

import pandas as pd
import dask.dataframe
import dask.multiprocessing
from dask.distributed import Client
from dask_jobqueue import SGECluster

def execute(row):
    import os
    import sys
    sys.path.append(os.getcwd())
    from test_function import test_function_dataframe
    return test_function_dataframe(row)




df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
ddf = dask.dataframe.from_pandas(df, chunksize=1)


cluster = SGECluster(cores=2, memory="1GB")
client = Client(cluster)
cluster.start_workers(len(df))

print("Web interface opened on port %d" % client.scheduler_info()["services"]["bokeh"])


results = ddf.apply(execute, axis=1, meta=[('a', 'i8'), ('b', 'i8'), ('c', 'i8'), ('d', 'O')])

df = results.compute(get=client.get)
print(df)


cluster.close()

Ошибка:

Web interface opened on port 8787
Traceback (most recent call last):
  File "example_dataframe.py", line 33, in <module>
    df = results.compute(get=client.get)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/base.py", line 395, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 2218, in get
    direct=direct)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 1581, in gather
    asynchronous=asynchronous)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 647, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/six.py", line 703, in reraise
    raise value
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 1457, in _gather
    traceback)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/six.py", line 702, in reraise
    raise value.with_traceback(tb)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/dataframe/core.py", line 3660, in apply_and_enforce
    df = func(*args, **kwargs)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/utils.py", line 688, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/frame.py", line 6878, in apply
    return op.get_result()
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/apply.py", line 186, in get_result
    return self.apply_standard()
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/apply.py", line 296, in apply_standard
    values, self.f, axis=self.axis, dummy=dummy, labels=labels
  File "pandas/_libs/reduction.pyx", line 618, in pandas._libs.reduction.compute_reduction
  File "pandas/_libs/reduction.pyx", line 128, in pandas._libs.reduction.Reducer.get_result
  File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/apply.py", line 113, in f
    return func(x, *args, **kwds)
TypeError: execute() got an unexpected keyword argument 'broadcast'
...