Python: вопросы о подпроцессах, конвейерных данных и многопроцессорности - PullRequest
0 голосов
/ 30 июня 2011

У меня есть программа на Python, которая использует Popen для вызова тестовой программы на C ++. Тестовая программа C ++ просто записывает 0-99999 в стандартный вывод. Программа Python имеет две функции, которые должны выполняться как отдельные процессы. Одна функция, funcA, должна запустить программу на C ++, прочитать целые числа из канала stdout и вставить эти целые числа в общую очередь. Другая функция, funcB, должна читать и печатать целые числа в очереди, пока очередь не станет пустой. У меня есть некоторые проблемы / вопросы, которые я опубликую ниже, вместе с моим кодом ниже.

  1. Как правильно использовать funcA для чтения из стандартного вывода программы C ++ до его завершения (программы C ++)?
  2. Каким образом funcB может читать из общей очереди до тех пор, пока все целые не будут обработаны?

Мой текущий метод для вопроса 1, я верю, работает, но я знаю, что могут быть некоторые проблемы, которые я не проверяю, такие как заполнение очереди. Кроме того, все числа не распечатываются (останавливается примерно на 98000), и я думаю, что это может быть связано с тем, что funcA прерывает и прерывает общую очередь? Я не совсем уверен, что делать с вопросом 2, потому что в документации сказано, что нельзя полагаться на empty () в атмосфере параллельной обработки, и я не хочу использовать while (1).

import multiprocessing
import subprocess
import Queue

def funcA(intQueue):
    # call C++ program
    handle = subprocess.Popen(['../C++/C++.exe'], stdout=subprocess.PIPE)

    while(handle.returncode == None):
        handle.stdout.readline()
        intQueue.put(handle.stdout.readline())
        handle.poll()

def funcB(intQueue):
    try:
        while(1):
            print intQueue.get(True, 2)
    except Queue.Empty:
        pass

if __name__ == "__main__":
    # shared Queue for all the processes
    intQueue = multiprocessing.Queue()

    # producer - receives ints from the C++ stdout and inserts into Queue
    multiprocessing.Process(target=funcA, args=(intQueue,)).start()

    # consumer - prints ints from the Queue
    multiprocessing.Process(target=funcB, args=(intQueue,)).start()

Ответы [ 2 ]

2 голосов
/ 30 июня 2011

Используйте communicate метод Popen, например:

handle = subprocess.Popen(['../C++/C++.exe'], stdout=subprocess.PIPE)
out, err = handle.communicate()    # this will block until the underlying subprocess exits

Что касается очереди, структура данных определяет методы для запроса указанной очереди, если она заполнена или пуста. Используйте их.

1 голос
/ 19 июля 2011

В случае, если кто-нибудь сталкивался с такой же проблемой:

Для вопроса 1 я использовал while (1), которое прерывается, когда список, возвращаемый из разбиения handle.stdout.read (), имеет длину 1(это означает, что ничего не было возвращено из трубы).

Для вопроса 2 я использовал метод отравленной таблетки, описанный в этом посте: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

...