Python Process Pool не является демоном? - PullRequest
71 голосов
/ 07 августа 2011

Можно ли создать пул python, который не является демоническим?Я хочу, чтобы пул мог вызывать функцию, в которой есть другой пул.

Я хочу это, потому что процессы deamon не могут создавать процессы.В частности, это приведет к ошибке:

AssertionError: daemonic processes are not allowed to have children

Например, рассмотрим сценарий, в котором function_a имеет пул, который запускает function_b, который имеет пул, который запускает function_c.Эта цепочка функций завершится ошибкой, потому что function_b выполняется в процессе демона, а процессы демона не могут создавать процессы.

Ответы [ 5 ]

96 голосов
/ 22 января 2012

Класс multiprocessing.pool.Pool создает рабочие процессы в своем методе __init__, делает их демоническими и запускает их, и невозможно переустановить их атрибут daemon в False до их запуска (ипосле этого это больше не разрешено).Но вы можете создать свой собственный подкласс multiprocesing.pool.Pool (multiprocessing.Pool - просто функция-обертка) и заменить свой собственный подкласс multiprocessing.Process, который всегда не является демоническим, для использования в рабочих процессах.

Вот полный пример того, как это сделать.Важными частями являются два класса NoDaemonProcess и MyPool наверху и для вызова pool.close() и pool.join() в вашем экземпляре MyPool в конце.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
13 голосов
/ 03 декабря 2015

Модуль multiprocessing имеет приятный интерфейс для использования пулов с процессами или .В зависимости от вашего текущего варианта использования, вы можете рассмотреть возможность использования multiprocessing.pool.ThreadPool для внешнего пула, что приведет к потокам (которые позволяют порождать процессы изнутри) в отличие от процессов.

Это может быть ограничено GIL, но в моем конкретном случае (я проверял оба) , время запуска процессов из внешнего Pool, как было создано здесь , намного перевешивало решениес ThreadPool.


Действительно просто поменять Processes на Threads.Узнайте больше о том, как использовать ThreadPool решение здесь или здесь .

11 голосов
/ 07 ноября 2018

У меня была необходимость использовать недемонический пул в Python 3.7, и в итоге я адаптировал код, размещенный в принятом ответе. Ниже приведен фрагмент, который создает недемонический пул:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Поскольку текущая реализация multiprocessing подверглась существенному рефакторингу с учетом контекста, нам необходимо предоставить класс NoDaemonContext, который имеет наш NoDaemonProcess в качестве атрибута. MyPool будет использовать этот контекст вместо контекста по умолчанию.

Тем не менее, я должен предупредить, что есть как минимум 2 предостережения для этого подхода:

  1. Он по-прежнему зависит от деталей реализации пакета multiprocessing и поэтому может выйти из строя в любое время.
  2. Существуют веские причины, по которым multiprocessing усложнил использование недемонических процессов, многие из которых объясняются здесь . Наиболее убедительным, на мой взгляд, является:

    Что касается разрешения дочерним потокам порождать собственных дочерних потоков, используя подпроцесс рискует создать маленькую армию зомби 'внуки', если родительские или дочерние потоки завершаются раньше подпроцесс завершается и возвращается.

2 голосов
/ 22 января 2019

В некоторых версиях Python замена стандартного пула на пользовательский может вызвать ошибку: AssertionError: group argument must be None for now.

Здесь Я нашел решение, которое может помочь:

class NonDaemonPool(multiprocessing.pool.Pool):
    def Process(self, *args, **kwds):
        proc = super(NonDaemonPool, self).Process(*args, **kwds)

        class NonDaemonProcess(proc.__class__):
            """Monkey-patch process to ensure it is never daemonized"""

            @property
            def daemon(self):
                return False

            @daemon.setter
            def daemon(self, val):
                pass

        proc.__class__ = NonDaemonProcess

        return proc
2 голосов
/ 24 ноября 2017

Проблема, с которой я столкнулся, заключалась в попытке импортировать глобальные переменные между модулями, в результате чего строка ProcessPool () оценивалась несколько раз.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Затем безопасно импортируйте из любого места в вашем коде

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         
...