Многопроцессорная обработка Python и многоузловая MPI - PullRequest
0 голосов
/ 22 мая 2018

Вот моя проблема.Я работаю над существующим многопроцессорным кодом Python (работающим на одном узле), и цель состоит в том, чтобы выполнить многоузловое выполнение этого кода с использованием MPI для Python (mpi4py).

Связь MPI между 2 узлами осуществляется только основным потоком каждого процесса MPI (поток, который вызывает MPI.Init (), вы можете узнать, какой из них это сделал, вызвав функцию MPI.Is_thread_main ().) но, к сожалению, не работают.

Фактически MPI-коммуникация не работает ПОСЛЕ запуска процессов python.

Чтобы объяснить проблему, я переписываю короткий код с точно такой же проблемой.

import os
import psutil
import multiprocessing
import numpy as np
import Queue
import time

from mpi4py import rc
rc.initialize = False # if = True, The Init is done when "from mpi4py import MPI" is called
rc.thread_level = 'funneled'

from mpi4py import MPI


def infiniteloop(arg):
    while True:
        print arg
        time.sleep(1)
        # Check if the worker think it's the thread who called MPI.Init()
        print "Worker is Main Thread %s" %(MPI.Is_thread_main())
        print "Rank %d on %s, Process PID for worker = %d" %(MPI.COMM_WORLD.Get_rank(),MPI.Get_processor_name(),os.getpid())



if __name__ == '__main__':

    MPI.Init()
    # In the code I'm working on, MPI.Init() has to be done before the miltiprocess initialization

    proc = multiprocessing.Process(target=infiniteloop, args=('RunningWorker',))
    proc.start()
    print "MultiProcess Stared"


    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size_mpi = comm.Get_size()

    while True:
        print "Running Main Thread"
        print "Main Thread is Main Thread %s" %( MPI.Is_thread_main())
        print "Rank %d on %s, Process PID for main = %s" %(MPI.COMM_WORLD.Get_rank(),MPI.Get_processor_name(),os.getpid())
        print "Rank %d on %s, rc.thread_level = %s" %((MPI.COMM_WORLD.Get_rank(),MPI.Get_processor_name(), rc.thread_level))
        time.sleep(1)

        # Start MPI Communication, It is just an example of 2D array communication which I know it works
        print "Start MPI Transfert"
        #*************** Multiple SEND AND RECEIVE for 2D Array fill randomly
        SumPsfmean = None
        TransPsfmean = None
        TABSIZE = 100
        # Create 100x100 array with random np.float64 values
        # (Because it's really close from the case I'm intersting for)
        # Row per row communication
        if rank == 0:
            psfmean = np.random.rand(TABSIZE,TABSIZE)
            print psfmean.dtype
        else:
            psfmean = np.random.rand(TABSIZE,TABSIZE)

        psfmean_shape = psfmean.shape

        if rank == 0:
            SumPsfmean = np.array(range(psfmean.size*(size_mpi-1)), dtype = np.float64)
            SumPsfmean.shape = (size_mpi-1, psfmean_shape[0], psfmean_shape[1])
            TransPsfmean = np.array(range(psfmean[0].size), dtype = np.float64)

        for i in range(psfmean_shape[1]):
            print "Rank %d : Send&Receive nb %d" %(rank, i)
            if rank == 0:
                comm.Recv(TransPsfmean, source=1, tag=i)
            elif rank ==1:
                comm.Send(psfmean[i], dest=0, tag=i)
            print"End Send&Receive %d" %i
            if rank == 0:
                for k in range(size_mpi):
                    if k != 0:
                        SumPsfmean[k-1][i] = TransPsfmean


    proc.join()

В этом примере только 2 процесса MPI создаются на 2 разных узлах.Таким образом, после инициализации MPI основная функция создаст и запустит процесс Python, затем запустит связь между процессом 2 MPI.MPI_rank_1 будет посылать строку за строкой информацию о двумерном массиве (100 строк), а MPI_rank_0 будет ожидать получения этой информации.

Результат:

[0] MultiProcess Stared
[0] Running Main Thread
[0] Main Thread is Main Thread  : True
[0] Rank 0 on genji271, Process PID for main = 227040
[0] Rank 0 on genji271, rc.thread_level = funneled
[1] MultiProcess Stared
[1] Running Main Thread
[1] Main Thread is Main Thread  : True
[1] Rank 1 on genji272, Process PID for main = 211028
[1] Rank 1 on genji272, rc.thread_level = funneled
[0] RunningWorker[0]
[1] RunningWorker
[0] Start MPI Transfert
[0] float64
[1] Start MPI Transfert
[1] Rank 1 : Send&Receive nb 0
[1] End Send&Receive 0
[1] Rank 1 : Send&Receive nb 1
[1] End Send&Receive 1
[1] Rank 1 : Send&Receive nb 2
[1] End Send&Receive 2
[1] Rank 1 : Send&Receive nb 3
[1] End Send&Receive 3
[1] Rank 1 : Send&Receive nb 4
[1] End Send&Receive 4
[1] Rank 1 : Send&Receive nb 5
[1] End Send&Receive 5[1]
[1] Rank 1 : Send&Receive nb 6[1]
[1] End Send&Receive 6
[1] Rank 1 : Send&Receive nb 7
[1] End Send&Receive 7[1]
[1] Rank 1 : Send&Receive nb 8
[1] End Send&Receive 8
[1] Rank 1 : Send&Receive nb 9
[1] End Send&Receive 9
[1] Rank 1 : Send&Receive nb 10
[1] End Send&Receive 10
[1] Rank 1 : Send&Receive nb 11
[1] End Send&Receive 11
[1] Rank 1 : Send&Receive nb 12
[1] End Send&Receive 12
[1] Rank 1 : Send&Receive nb 13
[1] End Send&Receive 13
[1] Rank 1 : Send&Receive nb 14[1]
[1] End Send&Receive 14
[1] Rank 1 : Send&Receive nb 15
[0] Rank 0 : Send&Receive nb 0
[0] Worker is Main Thread : True
[0] Rank 0 on genji271, Process PID for worker = 227046
[0] RunningWorker
[1] Worker is Main Thread : True
[1] Rank 1 on genji272, Process PID for worker = 211033
[1] RunningWorker
[0] Worker is Main Thread : True
[0] Rank 0 on genji271, Process PID for worker = 227046
[0] RunningWorker
[1] Worker is Main Thread : True
[1] Rank 1 on genji272, Process PID for worker = 211033
[1] RunningWorker
[0] Worker is Main Thread : True
[0] Rank 0 on genji271, Process PID for worker = 227046
...

Как видите, рабочий и основной потоки оба думают, что они являются потоками, вызвавшими MPI.Init ().Более того, обмен данными MPI прекращается между процессом 2 MPI (он прекрасно работает без процесса Python или когда MPI.Init выполняется после создания процесса !!).Фактически, MPI_rank_0, который предполагает получать строки, застревает в первой итерации и никогда не получает эту первую строку.

Я (думаю) понимаю, что процесс python является своего рода клоном основного потока (или, по крайней мере, разделяет / копирует память основного потока при создании процесса).Так что возможно, что MPI не видит разницы между основным потоком и его клоном (событие, если у них другой PID !!).Или, может, я что-то не так делаю?

Кто-нибудь может мне помочь?Я был бы очень признателен и мог бы поделиться с вами дополнительной информацией о моей проблеме.

...