multiprocessing.Queue поведение при разветвлении - PullRequest
1 голос
/ 20 января 2020

Через несколько os.fork() я пытаюсь обмениваться данными с детьми. Для этого я использую multiprocessing.Queue экземпляры. Очереди работают правильно, когда родитель ставит, а дети получают; но не наоборот.

Мой пример кода:

import os
import multiprocessing as mp
from queue import Empty

if __name__ == '__main__':

    n_workers = 5

    forward_queue = mp.Queue()
    pids_queue = mp.Queue()

    for n in range(n_workers):
        forward_queue.put(n)

    for n in range(n_workers):
        child = os.fork()
        if child:
            pass
        else:
            my_number = forward_queue.get()
            print('pid={} here, my number is {}'.format(os.getpid(), my_number))
            pids_queue.put(os.getpid())
            os._exit(0)  # correct way to exit a fork according to docs

    while True:
        try:
            pid_of_child = pids_queue.get(timeout=5)
        except Empty:
            print('no more pids')
            break
        else:
            print('one of my children had this pid={}'.format(pid_of_child))

вывод, который я получаю:

pid=19715 here, my number is 0
pid=19716 here, my number is 1
pid=19717 here, my number is 2
pid=19721 here, my number is 3
pid=19718 here, my number is 4
no more pids

вывод, который я ожидаю:

pid=19715 here, my number is 0
pid=19716 here, my number is 1
pid=19717 here, my number is 2
pid=19721 here, my number is 3
pid=19718 here, my number is 4
one of my children had this pid=19715
one of my children had this pid=19716
one of my children had this pid=19717
one of my children had this pid=19721
one of my children had this pid=19718
no more pids

Может кто-нибудь объяснить, почему это происходит?

1 Ответ

2 голосов
/ 20 января 2020

попробуйте это перед выходом из форка:

pids_queue.close()
pids_queue.join_thread()

Проблема в том, как работают очереди. После помещения значения в очередь запускается фоновый поток для передачи элемента в канал. Когда вы немедленно вызовете os._exit, поток будет закрыт. Для такого типа проблем разработаны методы .close и .join_thread.

...