MPI4Py: OpenMPI Как обновить словарь между процессами? - PullRequest
1 голос
/ 06 ноября 2019

В моем сценарии у меня есть среда, из которой я пытаюсь взять образцы. Каждый процесс будет отбирать несколько раз из этой среды.

import numpy as np
class EnvSim(object):
    @staticmethod
    def get():
        return np.random.randint(0, 2000)

from collections import defaultdict
class Dict(object):
    def __init__(self):
        self.d = defaultdict(int)

    def update(self, key):
        self.d[key] += 1
        print(key)
        data_array = [np.empty(1, dtype=np.int) for _ in range(num_cpu)]
        data_array[proc_id()] = np.array([key], dtype=np.int)
        MPI.COMM_WORLD.Bcast(data_array[proc_id()], root=proc_id())
        for data in data_array:
            self.d[data.tolist()[0]] += 1

Цель состоит в том, чтобы каждый процесс OpenMPI делился тем, что они выбрали, из среды синхронно или асинхронно. Является ли Bcast правильным методом для использования здесь, или я должен использовать что-то еще?

Это основное утверждение, которое я использовал для выполнения своей программы :( в настоящее время это не работает.

def mpi_fork(n, bind_to_core=False):
    """
    Re-launches the current script with workers linked by MPI.

    Args:
        n (int): Number of process to split into.

        bind_to_core (bool): Bind each MPI process to a core.
    """
    if n<=1:
        return
    if os.getenv("IN_MPI") is None:
        env = os.environ.copy()
        env.update(
            MKL_NUM_THREADS="1",
            OMP_NUM_THREADS="1",
            IN_MPI="1"
        )
        args = ["mpirun", "-np", str(n)]
        if bind_to_core:
            args += ["-bind-to", "core"]
        args += [sys.executable] + sys.argv
        subprocess.check_call(args, env=env)
        sys.exit()

if __name__ == '__main__':
    num_cpu = 3
    mpi_fork(num_cpu)

    dic = Dict()

    for _ in range(3):
        exp = EnvSim.get()
        dic.update(exp)

    print(dic.d)

1 Ответ

1 голос
/ 06 ноября 2019

Синхронный случай:

Я не уверен, что вы подразумеваете под "синхронно и асинхронно", поэтому я просто сосредоточусь на синхронном случае здесь.

Если вы хотите, чтобы все ранги сэмплировали и отправляли всем, то я думаю, что вы хотите alltoall вместо Bcast.

Ниже приведен пример сценария, в котором каждые rank выборки N значений из интервала (rank,rank+1), где N - это размер коммуникатора.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

senddata = np.random.uniform(rank,rank+1,size)
recvdata = np.empty(size, dtype=float)
comm.Alltoall(senddata, recvdata)

print("process %s sending %s receiving %s " % (rank,senddata,recvdata))

Вместо запуска самого скрипта, вы можете просто вызвать непосредственно из командной строки форму:

$ mpirun -np 3 python test.py

, и вы должны увидеть такой выводкак

Rank 0 sent [0.37362478 0.74304362 0.25090876] and received [0.37362478 1.81852273 2.48959575] 
Rank 1 sent [1.81852273 1.65782547 1.85142608] and received [0.74304362 1.65782547 2.23064501] 
Rank 2 sent [2.48959575 2.23064501 2.644848  ] and received [0.25090876 1.85142608 2.644848  ] 

Это может быть включено в цикл for, если требуется несколько циклов выборки / связи.

Асинхронный случай:

Если есть некоторое ожидание изменчивости во времени для выборки, тогда вы могли бы иметь нулевой ранг, чтобы быть ведущим и выполнять неблокирующие запросы каждого из оставшихся рангов. Например:

from mpi4py import MPI
import numpy as np
from time import sleep

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

tag_denoting_ready_to_send = 1
while 1:
    if comm.rank == 0:
        if comm.Iprobe(source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send):
            buffer_for_receiving = np.empty(1, dtype='i')
            comm.Recv([buffer_for_receiving, MPI.INT], source=MPI.ANY_SOURCE, tag=tag_denoting_ready_to_send)
            print(buffer_for_receiving[0])
    else:
        sleep(comm.rank*np.random.uniform())
        send_buffer = np.array(rank, dtype='i')
        comm.Send([send_buffer, MPI.INT], dest=0, tag=tag_denoting_ready_to_send)

Каждый ненулевой ранг спит и пытается Send их ранг в буфере до ранга 0 (который выводит это). Опять же, работа с

$ mpirun -np 20 python test2.py 

должна привести к выводу, например:

13
6
1
1
2
7
1
2
1
4
1
8
3
...