Рассеяние словаря с распределением Dask приводит к ошибке «Возврат» - PullRequest
0 голосов
/ 21 февраля 2020

Я использую распределенные вычисления Dask в кластере. Один из моих расчетов состоит в том, чтобы взять подмножества набора данных, чтобы применить линейную регрессию к каждому из них. Учитывая, что число подмножеств, которые я беру, составляет порядка сотен, я решил распределить вычисления.

Расчеты не выполнялись быстрее, чем последовательные, потому что все рабочие должны были прочитать подмножества данных из мой основной процесс, где находится набор данных. Тогда я решил разбросать подмножества по рабочим. Затем я применяю функцию к этим фьючерсам. Несмотря на то, что он работает, он возвращает ошибку с запутанным описанием, а затем печатает словарь фьючерсов (он показан ниже). Что здесь возможно?

Return                                    Traceback (most recent call last)
<ipython-input-48-9bd06b2945c7> in <module>
      4    for vri in vris:
      5     start=timeit.default_timer()
----> 6     feedbacks[dim][cas][wnn][vri]=feedback(vri,cas,wins[wnn],dim)
      7     stop=timeit.default_timer()
      8     print("Processed {3} ({0}) of case {1} in window {2} in {4:0.0f} seconds ".format(dim,cas,wnn,vri,stop-start))

<ipython-input-46-4f331fdf2c9e> in feedback(variable, case, window, dim)
    279             ):
    280  if dim == "0d":
--> 281   lmbda=feedback_method_1(variable,case,window,dim="0d")
    282  elif dim == "1d":
    283   ilats=range(tables["1d"]["free"]["lat"].shape[1])

<ipython-input-46-4f331fdf2c9e> in feedback_method_1(variable, case, window, ilat, ilon, dim)
    213  xvar=variable_slice("tsurf",case,window,ilat,ilon,dim="0d")
    214  yvar=variable_slice(variable,case,window,ilat,ilon,dim)
--> 215  lmbda=reg_ens(xvar,yvar,sn=300)["slope"]   # Get the slope distribution
    216  return lmbda
    217 

<ipython-input-46-4f331fdf2c9e> in reg_ens(x, y, sn)
     80    chunks={ i: (x[list(samples[i])],
     81                 y[list(samples[i])]) for i in range(len(samples)) }
---> 82    chunks=client2.scatter(chunks)
     83    print(chunks)
     84    for i in range(len(samples)):

/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, timeout, asynchronous)
   2089             timeout=timeout,
   2090             asynchronous=asynchronous,
-> 2091             hash=hash,
   2092         )
   2093 

/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    765         else:
    766             return sync(
--> 767                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    768             )
    769 

/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    343     if error[0]:
    344         typ, exc, tb = error[0]
--> 345         raise exc.with_traceback(tb)
    346     else:
    347         return result[0]

/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/utils.py in f()
    327             if callback_timeout is not None:
    328                 future = asyncio.wait_for(future, callback_timeout)
--> 329             result[0] = yield future
    330         except Exception as exc:
    331             error[0] = sys.exc_info()

/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/work/mh0066/m300556/miniconda3/envs/glamdring/lib/python3.7/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
   1904         ):
   1905             d = await self._scatter(keymap(tokey, data), workers, broadcast)
-> 1906             raise gen.Return({k: d[tokey(k)] for k in data})
   1907 
   1908         if isinstance(data, type(range(0))):

Return: {0: <Future: finished, type: builtins.tuple, key: 0>, 1: <Future: finished, type: builtins.tuple, key: 1>, 2: <Future: finished, type: builtins.tuple, key: 2>,...
...