Как порождать параллельные дочерние процессы в многопроцессорной системе? - PullRequest
42 голосов
/ 19 мая 2009

У меня есть скрипт Python, который я хочу использовать в качестве контроллера для другого скрипта Python. У меня есть сервер с 64 процессорами, поэтому я хочу создать до 64 дочерних процессов этого второго скрипта Python. Дочерний скрипт называется:

$ python create_graphs.py --name=NAME

где NAME - это что-то вроде XYZ, ABC, NYU и т. Д.

В моем скрипте родительского контроллера я извлекаю переменную name из списка:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Итак, мой вопрос: каков наилучший способ порождать эти процессы в детстве? Я хочу ограничить количество детей до 64 одновременно, поэтому необходимо отслеживать состояние (если дочерний процесс завершен или нет), чтобы я мог эффективно поддерживать работу всего поколения.

Я изучил использование пакета подпроцесса, но отклонил его, потому что он порождает только одного ребенка за раз. Наконец-то я нашел многопроцессорный пакет, но признаюсь, что он перегружен документацией по всем потокам и подпроцессам.

Прямо сейчас мой сценарий использует subprocess.call, чтобы порождать только одного ребенка за раз, и выглядит так:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Я действительно хочу, чтобы это породило 64 детей одновременно. В других вопросах о стековом потоке я видел людей, использующих очередь, но, похоже, это приводит к снижению производительности?

Ответы [ 4 ]

60 голосов
/ 20 мая 2009

Что вам нужно, так это класс процессов в многопроцессорной среде.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

А вот пример расчета, чтобы его было легче понять. Следующее разделит 10000 задач на N процессов, где N - количество процессоров. Обратите внимание, что я передаю None как число процессов. Это приведет к тому, что класс Pool будет использовать cpu_count для числа процессов ( ссылка )

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results
2 голосов
/ 17 июня 2009

Вот решение, которое я придумал, основываясь на комментариях Нади и Джима. Я не уверен, что это лучший способ, но он работает. Вызываемый исходный дочерний сценарий должен быть сценарием оболочки, поскольку мне нужно использовать некоторые сторонние приложения, включая Matlab. Поэтому мне пришлось взять его из Python и кодировать в bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Это кажется разумным решением? Я пытался использовать формат цикла while Джима, но мой сценарий просто ничего не возвращал. Я не уверен, почему это будет. Вот вывод, когда я запускаю скрипт с циклом Jim 'while', заменяющим цикл 'for':

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Когда я запускаю его с помощью цикла for, я получаю что-то более значимое:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Так что это работает, и я счастлив. Тем не менее, я до сих пор не понимаю, почему я не могу использовать цикл стилей Jim 'while' вместо цикла 'for', который я использую. Спасибо за помощь - я впечатлен широтой знаний @ stackoverflow.

1 голос
/ 20 мая 2009

Я не думаю, что вам нужна очередь, если вы не собираетесь извлекать данные из приложений (что, если вам нужны данные, я думаю, что в любом случае может быть проще добавить их в базу данных)

но примерьте размер:

поместите содержимое вашего скрипта create_graphs.py в функцию под названием "create_graphs"

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Я знаю, что это приведет к тому, что количество потоков будет на 1 меньше, чем у процессоров, что, вероятно, хорошо, но оставляет процессору управление потоками, дисковым вводом-выводом и другими событиями, происходящими на компьютере. Если вы решили использовать последнее ядро, просто добавьте к нему одно

edit : Я думаю, что, возможно, неправильно истолковал цель my_list. Вам не нужно my_list для отслеживания потоков вообще (так как на них все ссылаются элементы в списке threads). Но это прекрасный способ подачи входных данных процессов - или даже лучше: используйте функцию генератора;)

Цель my_list и threads

my_list содержит данные, которые необходимо обработать в вашей функции
threads это просто список текущих запущенных тем

цикл while делает две вещи, запускает новые потоки для обработки данных и проверяет, запущены ли какие-либо потоки.

Так что, если у вас есть (а) больше данных для обработки или (б) потоки, которые еще не завершены ... вы хотите, чтобы программа продолжала работать. Как только оба списка опустеют, они получат значение False, и цикл while завершится

1 голос
/ 20 мая 2009

Я бы определенно использовал многопроцессорность вместо того, чтобы использовать свое собственное решение с использованием подпроцесса.

...