У меня есть код Python, с помощью которого у меня есть два процесса, созданные основным.Один из них (передатчик) генерирует группу данных и отправляет их другому процессу (получателю) для их обработки.В конце передачи передатчик сигнализирует об окончании передачи, помещая сигнал в Pipe (), и получатель ждет, пока не получит сигнал, затем он пытается использовать все элементы в очереди.Однако проблема заключается в том, что процесс-передатчик не присоединится к основному процессу в конце.Я много искал и не мог найти решение для этого.Должна быть проблема с закрытием Pipe () или Queue () в передатчике.
from multiprocessing import Process, Pipe, Lock, Queue
import multiprocessing as mp
from time import sleep
from threading import Thread
import threading
import time
from numpy import random as rnd
def f1(tx, comm):
for ij in range(100):
tx.put(rnd.randn(1000,))
comm.send(1)
comm.close()
print('\nThe transmitter is done\n')
def g(rx: Queue, comm, l):
msg_lock = threading.Lock()
msg = 0
Rtimes = []
def readingQ(msg_lock):
while True:
msg_lock.acquire()
flg = msg
msg_lock.release()
if flg == 1:
l.acquire()
print('Waiting to consume all items')
l.release()
while rx.empty() is not True:
tim1 = time.time()
rx.get()
Rtimes.append(time.time()-tim1)
print('The items are consumed ...')
rx.close()
rx.join_thread()
l.acquire()
print('The background thread in queue is joined! ')
l.release()
break
tim1 = time.time()
print('rx Q size:', rx.qsize()) #, 'retrieved value: ',rx.get())
rx.get()
Rtimes.append(time.time()-tim1)
sleep(.001)
l.acquire()
print('Time taking to get the items: \n', Rtimes)
l.release()
def getmsg(msg_lock):
nonlocal msg
msg_lock.acquire()
msg = comm.recv()
msg_lock.release()
print('message is received: ', msg)
comm.close()
t1 = Thread(target=getmsg, args=(msg_lock,))
t2 = Thread(target=readingQ, args=(msg_lock,))
t2.start()
t1.start()
t1.join()
print('The message box thread is joined')
t2.join()
print('The consumer thread is joined')
if __name__ == '__main__':
mp.set_start_method('spawn')
printLock = Lock()
p = []
comm1, comm2 = Pipe(False)
print('comm1 fileno {} and comm2 fileno {}'.format(comm1.fileno(),comm2.fileno()))
qs = Queue()
p.append(Process(target=g, args=(qs,comm1, printLock)))
p.append(Process(target=f1, args=(qs,comm2)))
for o in p:
o.start()
printLock.acquire()
print('check if the second child process is alive or not: ', p[1].is_alive())
print('active children: ', mp.active_children())
print('Number of CPUs I have: ', mp.cpu_count())
printLock.release()
p[0].join()
print('The receiver successfully joined')
p[1].join()
print('The transmitter successfully joined')
Истинный код должен присоединиться к передатчику.