У меня есть распределенный кластер dask, и я использовал его для загрузки и преобразования данных. Работает как шарм.
Я хочу использовать его для параллельной обработки. Вот моя функция
el = 5000
n_using = 26
n_across= 6
mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)
def get_vals(c1, m, el, idx):
m1 = m[c1,:,:]
corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
for c2 in range(c1+1, el):
corr = np.corrcoef(m1.T, m[c2,:,:].T)
corr_vals[c2] = corr[idx]
return corr_vals
lazy_get_val = dask.delayed(get_vals, pure=True)
Вот одна версия процессора, которую я пытаюсь сделать:
arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)
Работает нормально, но занимает несколько часов.
Вот мой способ сделать это в сумерках:
lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)
Даже если он запускается all_corr[1].compute()
, он просто сидит и не отвечает. Когда я прерываю ядро, оно застревает в /distributed/utils.py:
~ /.../ lib / python3.6 / site-packages / distrib / utils.py в синхронизации (loop, func, * args, ** kwargs)
249 else:
250 while not e.is_set():
--> 251 e.wait(10)
252 if error[0]:
253 six.reraise(*error[0])
Есть предложения по отладке?
Другие вещи:
- Если я запускаю его с меньшим
mat
(el = 1000), и он работает нормально.
- Если я сделаю
el = 5000
, он зависнет.
- Если я прерываю ядро и снова запускаю его с
el = 1000
, оно зависает.