Вот моя проблема.Я работаю над существующим многопроцессорным кодом Python (работающим на одном узле), и цель состоит в том, чтобы выполнить многоузловое выполнение этого кода с использованием MPI для Python (mpi4py).
Связь MPI между 2 узлами осуществляется только основным потоком каждого процесса MPI (поток, который вызывает MPI.Init (), вы можете узнать, какой из них это сделал, вызвав функцию MPI.Is_thread_main ().) но, к сожалению, не работают.
Фактически MPI-коммуникация не работает ПОСЛЕ запуска процессов python.
Чтобы объяснить проблему, я переписываю короткий код с точно такой же проблемой.
import os
import psutil
import multiprocessing
import numpy as np
import Queue
import time
from mpi4py import rc
rc.initialize = False # if = True, The Init is done when "from mpi4py import MPI" is called
rc.thread_level = 'funneled'
from mpi4py import MPI
def infiniteloop(arg):
while True:
print arg
time.sleep(1)
# Check if the worker think it's the thread who called MPI.Init()
print "Worker is Main Thread %s" %(MPI.Is_thread_main())
print "Rank %d on %s, Process PID for worker = %d" %(MPI.COMM_WORLD.Get_rank(),MPI.Get_processor_name(),os.getpid())
if __name__ == '__main__':
MPI.Init()
# In the code I'm working on, MPI.Init() has to be done before the miltiprocess initialization
proc = multiprocessing.Process(target=infiniteloop, args=('RunningWorker',))
proc.start()
print "MultiProcess Stared"
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size_mpi = comm.Get_size()
while True:
print "Running Main Thread"
print "Main Thread is Main Thread %s" %( MPI.Is_thread_main())
print "Rank %d on %s, Process PID for main = %s" %(MPI.COMM_WORLD.Get_rank(),MPI.Get_processor_name(),os.getpid())
print "Rank %d on %s, rc.thread_level = %s" %((MPI.COMM_WORLD.Get_rank(),MPI.Get_processor_name(), rc.thread_level))
time.sleep(1)
# Start MPI Communication, It is just an example of 2D array communication which I know it works
print "Start MPI Transfert"
#*************** Multiple SEND AND RECEIVE for 2D Array fill randomly
SumPsfmean = None
TransPsfmean = None
TABSIZE = 100
# Create 100x100 array with random np.float64 values
# (Because it's really close from the case I'm intersting for)
# Row per row communication
if rank == 0:
psfmean = np.random.rand(TABSIZE,TABSIZE)
print psfmean.dtype
else:
psfmean = np.random.rand(TABSIZE,TABSIZE)
psfmean_shape = psfmean.shape
if rank == 0:
SumPsfmean = np.array(range(psfmean.size*(size_mpi-1)), dtype = np.float64)
SumPsfmean.shape = (size_mpi-1, psfmean_shape[0], psfmean_shape[1])
TransPsfmean = np.array(range(psfmean[0].size), dtype = np.float64)
for i in range(psfmean_shape[1]):
print "Rank %d : Send&Receive nb %d" %(rank, i)
if rank == 0:
comm.Recv(TransPsfmean, source=1, tag=i)
elif rank ==1:
comm.Send(psfmean[i], dest=0, tag=i)
print"End Send&Receive %d" %i
if rank == 0:
for k in range(size_mpi):
if k != 0:
SumPsfmean[k-1][i] = TransPsfmean
proc.join()
В этом примере только 2 процесса MPI создаются на 2 разных узлах.Таким образом, после инициализации MPI основная функция создаст и запустит процесс Python, затем запустит связь между процессом 2 MPI.MPI_rank_1 будет посылать строку за строкой информацию о двумерном массиве (100 строк), а MPI_rank_0 будет ожидать получения этой информации.
Результат:
[0] MultiProcess Stared
[0] Running Main Thread
[0] Main Thread is Main Thread : True
[0] Rank 0 on genji271, Process PID for main = 227040
[0] Rank 0 on genji271, rc.thread_level = funneled
[1] MultiProcess Stared
[1] Running Main Thread
[1] Main Thread is Main Thread : True
[1] Rank 1 on genji272, Process PID for main = 211028
[1] Rank 1 on genji272, rc.thread_level = funneled
[0] RunningWorker[0]
[1] RunningWorker
[0] Start MPI Transfert
[0] float64
[1] Start MPI Transfert
[1] Rank 1 : Send&Receive nb 0
[1] End Send&Receive 0
[1] Rank 1 : Send&Receive nb 1
[1] End Send&Receive 1
[1] Rank 1 : Send&Receive nb 2
[1] End Send&Receive 2
[1] Rank 1 : Send&Receive nb 3
[1] End Send&Receive 3
[1] Rank 1 : Send&Receive nb 4
[1] End Send&Receive 4
[1] Rank 1 : Send&Receive nb 5
[1] End Send&Receive 5[1]
[1] Rank 1 : Send&Receive nb 6[1]
[1] End Send&Receive 6
[1] Rank 1 : Send&Receive nb 7
[1] End Send&Receive 7[1]
[1] Rank 1 : Send&Receive nb 8
[1] End Send&Receive 8
[1] Rank 1 : Send&Receive nb 9
[1] End Send&Receive 9
[1] Rank 1 : Send&Receive nb 10
[1] End Send&Receive 10
[1] Rank 1 : Send&Receive nb 11
[1] End Send&Receive 11
[1] Rank 1 : Send&Receive nb 12
[1] End Send&Receive 12
[1] Rank 1 : Send&Receive nb 13
[1] End Send&Receive 13
[1] Rank 1 : Send&Receive nb 14[1]
[1] End Send&Receive 14
[1] Rank 1 : Send&Receive nb 15
[0] Rank 0 : Send&Receive nb 0
[0] Worker is Main Thread : True
[0] Rank 0 on genji271, Process PID for worker = 227046
[0] RunningWorker
[1] Worker is Main Thread : True
[1] Rank 1 on genji272, Process PID for worker = 211033
[1] RunningWorker
[0] Worker is Main Thread : True
[0] Rank 0 on genji271, Process PID for worker = 227046
[0] RunningWorker
[1] Worker is Main Thread : True
[1] Rank 1 on genji272, Process PID for worker = 211033
[1] RunningWorker
[0] Worker is Main Thread : True
[0] Rank 0 on genji271, Process PID for worker = 227046
...
Как видите, рабочий и основной потоки оба думают, что они являются потоками, вызвавшими MPI.Init ().Более того, обмен данными MPI прекращается между процессом 2 MPI (он прекрасно работает без процесса Python или когда MPI.Init выполняется после создания процесса !!).Фактически, MPI_rank_0, который предполагает получать строки, застревает в первой итерации и никогда не получает эту первую строку.
Я (думаю) понимаю, что процесс python является своего рода клоном основного потока (или, по крайней мере, разделяет / копирует память основного потока при создании процесса).Так что возможно, что MPI не видит разницы между основным потоком и его клоном (событие, если у них другой PID !!).Или, может, я что-то не так делаю?
Кто-нибудь может мне помочь?Я был бы очень признателен и мог бы поделиться с вами дополнительной информацией о моей проблеме.