Синхронный случай:
Я не уверен, что вы подразумеваете под "синхронно и асинхронно", поэтому я просто сосредоточусь на синхронном случае здесь.
Если вы хотите, чтобы все ранги сэмплировали и отправляли всем, то я думаю, что вы хотите alltoall
вместо Bcast
.
Ниже приведен пример сценария, в котором каждые rank
выборки N
значений из интервала (rank,rank+1)
, где N
- это размер коммуникатора.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
senddata = np.random.uniform(rank,rank+1,size)
recvdata = np.empty(size, dtype=float)
comm.Alltoall(senddata, recvdata)
print("process %s sending %s receiving %s " % (rank,senddata,recvdata))
Вместо запуска самого скрипта, вы можете просто вызвать непосредственно из командной строки форму:
$ mpirun -np 3 python test.py
, и вы должны увидеть такой выводкак
Rank 0 sent [0.37362478 0.74304362 0.25090876] and received [0.37362478 1.81852273 2.48959575]
Rank 1 sent [1.81852273 1.65782547 1.85142608] and received [0.74304362 1.65782547 2.23064501]
Rank 2 sent [2.48959575 2.23064501 2.644848 ] and received [0.25090876 1.85142608 2.644848 ]
Это может быть включено в цикл for
, если требуется несколько циклов выборки / связи.
Асинхронный случай:
Если есть некоторое ожидание изменчивости во времени для выборки, тогда вы могли бы иметь нулевой ранг, чтобы быть ведущим и выполнять неблокирующие запросы каждого из оставшихся рангов. Например:
from mpi4py import MPI
import numpy as np
from time import sleep
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
tag_denoting_ready_to_send = 1
while 1:
if comm.rank == 0:
if comm.Iprobe(source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send):
buffer_for_receiving = np.empty(1, dtype='i')
comm.Recv([buffer_for_receiving, MPI.INT], source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send)
print(buffer_for_receiving[0])
else:
sleep(comm.rank*np.random.uniform())
send_buffer = np.array(rank, dtype='i')
comm.Send([send_buffer, MPI.INT], dest=0, tag=tag_denoting_ready_to_send)
Каждый ненулевой ранг спит и пытается Send
их ранг в буфере до ранга 0 (который выводит это). Опять же, работа с
$ mpirun -np 20 python test2.py
должна привести к выводу, например:
13
6
1
1
2
7
1
2
1
4
1
8
3