мультипроцессинг и сборка мусора - PullRequest
12 голосов
/ 01 апреля 2012

В py2.6 + модуль multiprocessing предлагает класс Pool, так что можно сделать:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        return pool.imap(...)

Однако, со стандартной реализацией Python в 2.7.2, этот подход вскоре приводит к «IOError: [Errno 24] Слишком много открытых файлов». Очевидно, объект pool никогда не получает мусор, поэтому его процессы никогда не завершаются, накапливая любые дескрипторы, открытые внутри. Я думаю, что это потому, что работает следующее:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        result = pool.map(...)
        pool.terminate()
        return result

Я бы хотел сохранить "ленивый" подход итератора imap; как работает сборщик мусора в этом случае? Как исправить код?

Ответы [ 3 ]

8 голосов
/ 01 апреля 2012

В конце концов я передал ссылку pool и завершил ее вручную после завершения итератора pool.imap:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        return pool, pool.imap(...)

    def call_stuff(self):
        pool, results = self.do_stuff()
        for result in results:
            # lazy evaluation of the imap
        pool.terminate()

В случае, если кто-нибудь наткнется на это решение в будущем: параметр chunksize очень важен в Pool.imap (в отличие от простого Pool.map, где это не имело значения). Я вручную установил его так, чтобы каждый процесс получал 1 + len(input) / len(pool) заданий. Оставив значение по умолчанию chunksize=1, я получил такую ​​же производительность, как если бы я вообще не использовал параллельную обработку ... плохо.

Я полагаю, что использование упорядоченного imap против упорядоченного map не дает никакой реальной выгоды, мне просто больше нравятся итераторы.

5 голосов
/ 01 апреля 2012

В Python у вас практически нет гарантии того, когда что-либо будет уничтожено, и в данном случае это не то, как мультипроцессорные пулы предназначены для использования.

Правильная вещь, которую нужно сделать - это поделиться однимобъединить несколько вызовов функции.Самый простой способ сделать это - сохранить пул в виде переменной класса (или, может быть, экземпляра):

class Dispatcher:
    pool = multiprocessing.Pool()
    def do_stuff(self, ...):
        result = self.pool.map(...)
        return result
3 голосов
/ 04 марта 2016

Действительно, даже если все пользовательские ссылки на объект pool удалены, и в коде очереди нет задач, и вся сборка мусора выполнена, тогда процессы по-прежнему остаются непригодными для использования зомби в операционная система - плюс у нас есть 3 служебных потока зомби из Pool висели (Python 2.7 и 3.4):

>>> del pool
>>> gc.collect()
0
>>> gc.garbage
[]
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>, 
 <Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>]

И далее Pool() добавит все больше и больше зомби процессов и потоков ... которые останутся до завершения основного процесса.

Для остановки такого пула зомби требуется специальный удар - через служебную ветку _handle_workers:

>>> ths = threading.enumerate()
>>> for th in ths: 
...     try: th.name, th._state, th._Thread__target
...     except AttributeError: pass
...     
('MainThread', 1, None)
('Thread-8', 0, <function _handle_tasks at 0x01462A30>)
('Thread-9', 0, <function _handle_results at 0x014629F0>)
('Thread-7', 0, <function _handle_workers at 0x01462A70>)
>>> ths[-1]._state = multiprocessing.pool.CLOSE  # or TERMINATE
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>]
>>> 

Это завершает другие потоки службы, а также завершает дочерние процессы.


Я думаю, что одна проблема в том, что в библиотеке Python есть ошибка утечки ресурсов , которую можно исправить, если правильно использовать weakref.

Другой момент заключается в том, что Pool создание и завершение обходятся дорого (включая 3 служебных потока на пул только для управления!), И обычно нет причин иметь гораздо больше рабочих процессов, чем процессорных ядер (высокая загрузка ЦП) или более ограниченного числа в соответствии с другим ограничивающим ресурсом (например, пропускная способность сети). Поэтому разумно рассматривать пул больше как глобальный ресурс единственного приложения (необязательно управляемый тайм-аутом), а не как быстрый объект, удерживаемый замыканием (или прерыванием () - обходной путь из-за ошибки) ,

Например:

try:
    _unused = pool   # reload safe global var
except NameError:
    pool = None

def get_pool():
    global pool
    if pool is None:
        atexit.register(stop_pool)
        pool = Pool(CPUCORES)
    return pool

def stop_pool():
    global pool
    if pool:
        pool.terminate()
        pool = None
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...