Я пытаюсь использовать multiprocessing.Pipe()
как средство коммуникации в нескольких процессах. Но когда я передаю канал в pool.apply_async()
в качестве параметра, возникает проблема взаимоблокировки. Почему?
Код и вывод:
# coding=utf-8
from multiprocess import pool
from multiprocessing import Pipe, Pool, set_start_method, get_context, Queue, Manager, Process
import time
def worker_process(name, _out_pipe, _in_pipe):
# _out_pipe.close()
for x in range(10):
_in_pipe.send(name + ':' + str(x))
print(name + ' send value :' + str(x))
time.sleep(0.1)
# _in_pipe.close()
if __name__ == '__main__':
set_start_method('spawn')
print(get_context())
# with pool.Pool() as pool:
with Pool() as pool:
pool.apply_async(worker_process, ('son_p1', out_pipe, in_pipe))
pool.apply_async(worker_process, ('son_p2', out_pipe, in_pipe))
pool.apply_async(worker_process, ('son_p3', out_pipe, in_pipe))
# pool.apply(worker_process, ('son_p1', out_pipe, in_pipe))
# pool.apply(worker_process, ('son_p2', out_pipe, in_pipe))
# pool.apply(worker_process, ('son_p3', out_pipe, in_pipe))
pool.close()
pool.join()
while out_pipe.poll():
print(out_pipe.recv())
# in_pipe.close()
# out_pipe.close()
Process ForkPoolWorker-2:
Process ForkPoolWorker-5:
Process ForkPoolWorker-1:
Process ForkPoolWorker-6:
Process ForkPoolWorker-8:
Process ForkPoolWorker-9:
Process ForkPoolWorker-7:
Process ForkPoolWorker-4:
Traceback (most recent call last):
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/pipe_example.py", line 34, in <module>
pool.join()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 662, in join
self._worker_handler.join()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1011, in join
self._wait_for_tstate_lock()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
Но когда я заменяю multiprocessing.Pool()
на multiprocess.pool.Pool()
или multiprocessing.Pool().apply_async()
на multiprocessing.Pool().apply()
, программа будет работать нормально. Почему?