Как управлять пулом процессов? - PullRequest
0 голосов
/ 28 сентября 2018

Я пытаюсь настроить многопроцессорный пул в Windows 10.

Обычно некоторые процессоры (в моем случае 12) должны читать из Qin и записывать результаты в Qout.При записи 'end' в Qin процесс должен остановиться.

По какой-то причине процесс зависает.

Я разработал простую версию:

from multiprocessing import Pool, Queue, Event
import os,time


def worker( Qin, Qout, event):
    time.sleep(5)
    while True:
        item = Qin.get()
        if item == 'end':
            event.set()
        else:
            Qout.put(item)
        time.sleep(1)

def manager():
    Qin,Qout,event= Queue(), Queue(), Event()
    processes = os.cpu_count()
    pool = Pool(processes=processes)
    for _ in range(processes):
        pool.apply_async(worker,args= (Qin,Qout,event,))
    for i in range(100):
        print(i)
        Qin.put(i)

    Qin.put('end')

    pool.close()
    event.wait()
    pool.terminate()
    return Qout

Qout = manager()

Ответы [ 2 ]

0 голосов
/ 29 сентября 2018

Я думаю, причина того, что ваш код зависает, состоит в том, что все рабочие задачи в конечном итоге ожидают, что что-то появится во входной очереди со строкой item = Qin.get() одновременно, потому что get() "блокирует", ожидая, что что-то будет помещенов очереди.Один из способов избежать этого - использовать вместо этого неблокирующий метод get_nowait().Для этого требуется, чтобы код обрабатывал любое исключение Empty, которое он может вызвать, но при этом не требуется, чтобы какое-либо дальнейшее выполнение в этом процессе было эффективно остановлено на этом этапе.

Также, чтобы все работало, вам необходимо создать и использоватьmultiprocessing.Manager, который создает серверный процесс, который содержит объекты Python и позволяет другим процессам манипулировать ими через прокси.См. Часть «Серверный процесс» в разделе Состояние общего доступа между процессами в документации.

Кроме того, при использовании multiprocessing в Windows очень важноУбедитесь, что код основного процесса выполняется условно, поместив его в оператор if __name__ == '__main__':.Это из-за того, как модуль был реализован на этой платформе - в противном случае этот код будет выполняться снова каждый раз, когда запускается другая параллельная задача (которая включает в себя import их редактирование).

Ниже приведен вашкод с необходимой модификацией, поэтому он использует multiprocessing.Manager.Примечание. Я изменил имя вашей функции manager(), чтобы избежать путаницы с multiprocessing.Manager, который используется для создания общих объектов.

import multiprocessing
from queue import Empty as QueueEmpty
import os
import time

END_MARKER = 'end'


def worker(id, Qin, Qout, event):
    while True:
        try:
            item = Qin.get_nowait()  # Non-blocking.
        except QueueEmpty:
            if event.is_set():  # Last item seen?
               break
            continue # Keep polling.

        if item == END_MARKER:  # Last item?
            event.set()
            break  # Quit.

        Qout.put('{} via worker {}'.format(item, id))
        time.sleep(.25)


def pool_manager():
    processes = os.cpu_count()
    pool = multiprocessing.Pool(processes=processes)
    manager = multiprocessing.Manager()
    Qin, Qout, event = manager.Queue(), manager.Queue(), manager.Event()

    for i in range(100):
        Qin.put(i)

    Qin.put(END_MARKER)

    for id in range(processes):
        pool.apply_async(worker, (id, Qin, Qout, event))

    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.

    return Qout


if __name__ == '__main__':
    print('Processing')
    Qout = pool_manager()

    print('Contents of Qout:')
    while not Qout.empty():
        item = Qout.get()
        print(' ', item)

    print('End of script')
0 голосов
/ 28 сентября 2018

Вы должны понимать, как правильно работает асинхронное программирование в Python.Когда вы вызываете apply_async, вы получаете объект Future.Реализация Queue в python использует системный канал для передачи данных от одного процесса другому и несколько семафоров для защиты чтения и записи в этом канале.

from multiprocessing import Pool, Queue, Event
import os
import time
import multiprocessing

def worker( Qin, Qout, event):
    print('worker')
    time.sleep(1)
    event.set()

def manager():
    processes = multiprocessing.cpu_count()
    m = multiprocessing.Manager()
    Qin = m.Queue()
    Qout = m.Queue()
    event = m.Event()
    pool = Pool(processes=processes)
    result = pool.apply_async(worker, (Qin, Qout, event))
    result.get()
    pool.close()
    event.wait()
    return Qout

if __name__ == '__main__':
    Qout = manager()
...