Я использую распределенные вычисления Dask в кластере. Один из моих расчетов состоит в том, чтобы взять подмножества набора данных, чтобы применить линейную регрессию к каждому из них. Учитывая, что число подмножеств, которые я беру, составляет порядка сотен, я решил распределить вычисления.
Расчеты не выполнялись быстрее, чем последовательные, потому что все рабочие должны были прочитать подмножества данных из мой основной процесс, где находится набор данных. Тогда я решил разбросать подмножества по рабочим. Затем я применяю функцию к этим фьючерсам. Несмотря на то, что он работает, он возвращает ошибку с запутанным описанием, а затем печатает словарь фьючерсов (он показан ниже). Что здесь возможно?
Return Traceback (most recent call last)
<ipython-input-48-9bd06b2945c7> in <module>
4 for vri in vris:
5 start=timeit.default_timer()
----> 6 feedbacks[dim][cas][wnn][vri]=feedback(vri,cas,wins[wnn],dim)
7 stop=timeit.default_timer()
8 print("Processed {3} ({0}) of case {1} in window {2} in {4:0.0f} seconds ".format(dim,cas,wnn,vri,stop-start))
<ipython-input-46-4f331fdf2c9e> in feedback(variable, case, window, dim)
279 ):
280 if dim == "0d":
--> 281 lmbda=feedback_method_1(variable,case,window,dim="0d")
282 elif dim == "1d":
283 ilats=range(tables["1d"]["free"]["lat"].shape[1])
<ipython-input-46-4f331fdf2c9e> in feedback_method_1(variable, case, window, ilat, ilon, dim)
213 xvar=variable_slice("tsurf",case,window,ilat,ilon,dim="0d")
214 yvar=variable_slice(variable,case,window,ilat,ilon,dim)
--> 215 lmbda=reg_ens(xvar,yvar,sn=300)["slope"] # Get the slope distribution
216 return lmbda
217
<ipython-input-46-4f331fdf2c9e> in reg_ens(x, y, sn)
80 chunks={ i: (x[list(samples[i])],
81 y[list(samples[i])]) for i in range(len(samples)) }
---> 82 chunks=client2.scatter(chunks)
83 print(chunks)
84 for i in range(len(samples)):
/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, timeout, asynchronous)
2089 timeout=timeout,
2090 asynchronous=asynchronous,
-> 2091 hash=hash,
2092 )
2093
/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
765 else:
766 return sync(
--> 767 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
768 )
769
/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
343 if error[0]:
344 typ, exc, tb = error[0]
--> 345 raise exc.with_traceback(tb)
346 else:
347 return result[0]
/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/utils.py in f()
327 if callback_timeout is not None:
328 future = asyncio.wait_for(future, callback_timeout)
--> 329 result[0] = yield future
330 except Exception as exc:
331 error[0] = sys.exc_info()
/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
1904 ):
1905 d = await self._scatter(keymap(tokey, data), workers, broadcast)
-> 1906 raise gen.Return({k: d[tokey(k)] for k in data})
1907
1908 if isinstance(data, type(range(0))):
Return: {0: <Future: finished, type: builtins.tuple, key: 0>, 1: <Future: finished, type: builtins.tuple, key: 1>, 2: <Future: finished, type: builtins.tuple, key: 2>,...