Тупик возникает при использовании multiprocessing.Pipe () в функции multiprocessing.Pool.apply_asyn c - PullRequest
1 голос
/ 09 июля 2020

Я пытаюсь использовать multiprocessing.Pipe() как средство коммуникации в нескольких процессах. Но когда я передаю канал в pool.apply_async() в качестве параметра, возникает проблема взаимоблокировки. Почему?

Код и вывод:

# coding=utf-8
from multiprocess import pool
from multiprocessing import Pipe, Pool, set_start_method, get_context, Queue, Manager, Process
import time


def worker_process(name, _out_pipe, _in_pipe):

    # _out_pipe.close()

    for x in range(10):
        _in_pipe.send(name + ':' + str(x))
        print(name + ' send value :' + str(x))
        time.sleep(0.1)

    # _in_pipe.close()

if __name__ == '__main__':
    set_start_method('spawn')
    print(get_context())
    # with pool.Pool() as pool:
    with Pool() as pool:
        pool.apply_async(worker_process, ('son_p1', out_pipe, in_pipe))
        pool.apply_async(worker_process, ('son_p2', out_pipe, in_pipe))
        pool.apply_async(worker_process, ('son_p3', out_pipe, in_pipe))
        # pool.apply(worker_process, ('son_p1', out_pipe, in_pipe))
        # pool.apply(worker_process, ('son_p2', out_pipe, in_pipe))
        # pool.apply(worker_process, ('son_p3', out_pipe, in_pipe))
        pool.close()
        pool.join()

    while out_pipe.poll():
        print(out_pipe.recv())

    # in_pipe.close()
    # out_pipe.close()

Process ForkPoolWorker-2:
Process ForkPoolWorker-5:
Process ForkPoolWorker-1:
Process ForkPoolWorker-6:
Process ForkPoolWorker-8:
Process ForkPoolWorker-9:
Process ForkPoolWorker-7:
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/pipe_example.py", line 34, in <module>
    pool.join()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 662, in join
    self._worker_handler.join()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):

Но когда я заменяю multiprocessing.Pool() на multiprocess.pool.Pool() или multiprocessing.Pool().apply_async() на multiprocessing.Pool().apply(), программа будет работать нормально. Почему?

1 Ответ

1 голос
/ 09 июля 2020

Попробуйте использовать spawn для создания новых процессов вместо fork, как указано в этом сообщении в блоге .

from multiprocessing import set_start_method

if __name__ == '__main__':
    set_start_method("spawn")
    print(get_context())

Кроме того, попробуйте завершить свой пул с помощью finally. Это появилось как проблема в другом месте .

try:
    with Pool() as pool:
        pool.apply_async(worker_process, ('son_p1', out_pipe, in_pipe))
        pool.apply_async(worker_process, ('son_p2', out_pipe, in_pipe))
        pool.apply_async(worker_process, ('son_p3', out_pipe, in_pipe))
        pool.close()
        pool.join()

    while out_pipe.poll():
        print(out_pipe.recv())
finally:
    pool.terminate()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...