Управлять количеством подпроцессов, используемых для вызова внешних команд в Python - PullRequest
13 голосов
/ 21 марта 2012

Я понимаю, что использование подпроцесса является предпочтительным способом вызова внешней команды.

Но что, если я хочу запустить несколько команд в parall, но ограничить количество порождаемых процессов? Меня беспокоит то, что я не могу заблокировать подпроцессы. Например, если я позвоню

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

Затем процесс продолжится, не дожидаясь завершения cmd. Поэтому я не могу обернуть это в работника библиотеки multiprocessing.

Например, если я сделаю:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

тогда каждый рабочий завершит работу и вернется после создания подпроцесса. Поэтому я не могу ограничить число процессов, генерируемых subprocess, используя Pool.

Как правильно ограничивать количество подпроцессов?

Ответы [ 2 ]

12 голосов
/ 21 марта 2012

Вам не нужно несколько процессов Python или даже потоков, чтобы ограничить максимальное количество параллельных подпроцессов:

from itertools import izip_longest
from subprocess import Popen, STDOUT

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
          for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
    for p in filter(None, processes):
        p.wait()

См. Итерация итератором по частям (из n) в Python?

Если вы хотите ограничить максимальное и минимальное количество параллельных подпроцессов, вы можете использовать пул потоков:

from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call

def run(cmd):
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
    if rc != 0:
        print('{cmd} failed with exit status: {rc}'.format(**vars()))

Как только закончится любой из limit подпроцессов, aновый подпроцесс запускается для поддержки limit количества подпроцессов постоянно.

Или используя ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call

with ThreadPoolExecutor(max_workers=limit) as executor:
    for cmd in commands:
        executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)

Вот простая реализация пула потоков:

import subprocess
from threading import Thread

try: from queue import Queue
except ImportError:
    from Queue import Queue # Python 2.x


def worker(queue):
    for cmd in iter(queue.get, None):
        subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)

q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
    t.daemon = True
    t.start()

for cmd in commands:  # feed commands to threads
    q.put_nowait(cmd)

for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join()    # wait for completion

Чтобы избежать преждевременного выхода, добавьте исключениеобработка.

Если вы хотите захватить выходные данные подпроцесса в строку, см. Python: параллельное выполнение подпроцесса cat .

6 голосов
/ 21 марта 2012

Вы можете использовать subprocess.call, если хотите дождаться завершения команды.См. pydoc subprocess для получения дополнительной информации.

Вы также можете вызвать метод Popen.wait в своем работнике:

def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
    p.wait()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...