Я новичок в использовании Dask и Cluster и использовании следующего кода, чтобы убедиться, что я правильно использую / подключаю кластер. Теперь я получаю следующую ошибку. Сначала я использовал dask_drmaa
для импорта SGECluster
.
import pandas as pd
import dask.dataframe
import dask.multiprocessing
from dask.distributed import Client
from dask_jobqueue import SGECluster
def execute(row):
import os
import sys
sys.path.append(os.getcwd())
from test_function import test_function_dataframe
return test_function_dataframe(row)
df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
ddf = dask.dataframe.from_pandas(df, chunksize=1)
cluster = SGECluster(cores=2, memory="1GB")
client = Client(cluster)
cluster.start_workers(len(df))
print("Web interface opened on port %d" % client.scheduler_info()["services"]["bokeh"])
results = ddf.apply(execute, axis=1, meta=[('a', 'i8'), ('b', 'i8'), ('c', 'i8'), ('d', 'O')])
df = results.compute(get=client.get)
print(df)
cluster.close()
Ошибка:
Web interface opened on port 8787
Traceback (most recent call last):
File "example_dataframe.py", line 33, in <module>
df = results.compute(get=client.get)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/base.py", line 395, in compute
results = schedule(dsk, keys, **kwargs)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 2218, in get
direct=direct)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 1581, in gather
asynchronous=asynchronous)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 647, in sync
return sync(self.loop, func, *args, **kwargs)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
six.reraise(*error[0])
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/six.py", line 703, in reraise
raise value
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
result[0] = yield future
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/tornado/gen.py", line 742, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/distributed/client.py", line 1457, in _gather
traceback)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/six.py", line 702, in reraise
raise value.with_traceback(tb)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/dataframe/core.py", line 3660, in apply_and_enforce
df = func(*args, **kwargs)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/dask/utils.py", line 688, in __call__
return getattr(obj, self.method)(*args, **kwargs)
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/frame.py", line 6878, in apply
return op.get_result()
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/apply.py", line 186, in get_result
return self.apply_standard()
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/apply.py", line 296, in apply_standard
values, self.f, axis=self.axis, dummy=dummy, labels=labels
File "pandas/_libs/reduction.pyx", line 618, in pandas._libs.reduction.compute_reduction
File "pandas/_libs/reduction.pyx", line 128, in pandas._libs.reduction.Reducer.get_result
File "/data/cluster/users/khodabakhshandeh/anaconda3/envs/tf/lib/python3.6/site-packages/pandas/core/apply.py", line 113, in f
return func(x, *args, **kwds)
TypeError: execute() got an unexpected keyword argument 'broadcast'