Получение Dask map_blocks для использования всех доступных ресурсов - PullRequest
0 голосов
/ 01 ноября 2018

Я использую Dask для распараллеливания анализа космических снимков временных рядов на кластере со значительным количеством вычислительных ресурсов.

Я установил распределенный планировщик со многими рабочими (--nprocs = 56), каждый из которых управляет одним потоком (--nthreads = 1) и 4 ГБ памяти из-за смущающе параллельного характера работы.

Мои данные поступают в виде xarray, который разбивается на части в массиве dask, и map_blocks используется для отображения функции на каждый чанк, чтобы сгенерировать выходной массив, который будет сохранен в файл изображения.

data = inputArray.chunk(chunks={'y':1})                
client.persist(data)            
future = data.data.map_blocks(timeSeriesTrends.timeSeriesTrends, jd, drop_axis=[1])            
future = client.persist(future)                
dask.distributed.wait(future)
outputArray = future.compute()

Моя проблема в том, что Dask не использует все ресурсы, которые я ему выделил. Вместо этого он начинается с очень небольшого числа распараллеленных задач и постепенно добавляет больше по мере того, как процессы заканчивают работу, даже не достигая емкости.

Это резко ограничивает возможности оборудования, к которому у меня есть доступ, так как многие из моих ресурсов проводят большую часть своего времени в бездействии.

Подходит ли мой подход для генерации выходного массива из входного массива? Как лучше всего использовать оборудование, к которому у меня есть доступ, в этой ситуации?

...