Я пытаюсь использовать Dask для параллельной обработки на нескольких узлах на суперкомпьютерных ресурсах - но карта, распределенная по Dask, использует только один из узлов.
Вот тестовый скрипт, который я использую длянастройте клиент и выполните простую операцию:
import time
from distributed import Client
from dask_jobqueue import SLURMCluster
from socket import gethostname
def slow_increment(x):
time.sleep(10)
return [x + 1, gethostname(), time.time()]
cluster = SLURMCluster(
queue='somequeue',
cores=2,
memory='128GB',
project='someproject',
walltime='00:05:00',
job_extra=['-o myjob.%j.%N.out',
'-e myjob.%j.%N.error'],
env_extra=['export I_MPI_FABRICS=dapl',
'source activate dask-jobqueue'])
cluster.scale(2)
client = Client(cluster)
A = client.map(slow_increment, range(8))
B = client.gather(A)
print(client)
for res in B:
print(res)
client.close()
А вот вывод:
<Client: scheduler='tcp://someip' processes=2 cores=4>
[1, 'bdw-0478', 1540477582.6744401]
[2, 'bdw-0478', 1540477582.67487]
[3, 'bdw-0478', 1540477592.68666]
[4, 'bdw-0478', 1540477592.6879778]
[5, 'bdw-0478', 1540477602.6986163]
[6, 'bdw-0478', 1540477602.6997452]
[7, 'bdw-0478', 1540477612.7100565]
[8, 'bdw-0478', 1540477612.711296]
При распечатке информации о клиенте указывается, что Dask имеет правильное количество узлов (процессов) и задач для каждого узла (ядра), выходные данные socket.gethostname () и отметки времени указывают, что второй узел не используется.Я знаю, что dask-jobqueue успешно запросил два узла и что оба задания выполняются одновременно.Я пытался использовать разные матрицы MPI для связи между узлами и внутри узла (например, tcp, shm: tcp, shm: ofa, ofa, ofi, dapl), но это не изменило результат.Я также попытался удалить команду «export I_MPI_FABRICS» и использовать опцию «interface», но это привело к зависанию кода.