Я использую Dask для распараллеливания анализа космических снимков временных рядов на кластере со значительным количеством вычислительных ресурсов.
Я установил распределенный планировщик со многими рабочими (--nprocs = 56
), каждый из которых управляет одним потоком (--nthreads = 1
) и 4 ГБ памяти из-за смущающе параллельного характера работы.
Мои данные поступают в виде xarray, который разбивается на части в массиве dask, и map_blocks используется для отображения функции на каждый чанк, чтобы сгенерировать выходной массив, который будет сохранен в файл изображения.
data = inputArray.chunk(chunks={'y':1})
client.persist(data)
future = data.data.map_blocks(timeSeriesTrends.timeSeriesTrends, jd, drop_axis=[1])
future = client.persist(future)
dask.distributed.wait(future)
outputArray = future.compute()
Моя проблема в том, что Dask не использует все ресурсы, которые я ему выделил. Вместо этого он начинается с очень небольшого числа распараллеленных задач и постепенно добавляет больше по мере того, как процессы заканчивают работу, даже не достигая емкости.
Это резко ограничивает возможности оборудования, к которому у меня есть доступ, так как многие из моих ресурсов проводят большую часть своего времени в бездействии.
Подходит ли мой подход для генерации выходного массива из входного массива? Как лучше всего использовать оборудование, к которому у меня есть доступ, в этой ситуации?