завершение процессов демона с помощью многопроцессорного модуля - PullRequest
2 голосов
/ 25 января 2011

Ниже приведен пример использования многопроцессорности. Это модель пула процессов. Это не так просто, как могло бы быть, но относительно близко по структуре к коду, который я на самом деле использую. Он также использует sqlalchemy, извините.

Мой вопрос - у меня в настоящее время есть ситуация, когда у меня есть относительно долго работающий скрипт Python, который выполняет ряд функций, каждая из которых похожа на приведенный ниже код, поэтому родительский процесс одинаков во всех случаях. Другими словами, несколько пулов создаются одним скриптом Python. (Полагаю, я не должен делать это таким образом, но альтернатива состоит в том, чтобы использовать что-то вроде os.system и subprocess.) Проблема в том, что эти процессы зависают и держатся за память. В документах говорится, что эти процессы-демоны должны оставаться до завершения родительского процесса, но что если родительский процесс продолжит генерировать другой пул или процессы и не завершится немедленно.

Вызов terminate () работает, но это не выглядит вежливо. Есть ли хороший способ попросить процессы прекратить красиво? То есть убери за собой и уходи сейчас, мне нужно запустить следующий бассейн?

Я также попытался вызвать join () для процессов. Согласно документации это означает ожидание завершения процессов. Что делать, если они не планируют прекратить? На самом деле происходит зависание процесса.

Заранее спасибо.

С уважением, Фахим.

import multiprocessing, time

class Worker(multiprocessing.Process):
    """Process executing tasks from a given tasks queue"""
    def __init__(self, queue, num):
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        self.daemon = True

    def run(self):
        import traceback
        while True:
            func, args, kargs = self.queue.get()
            try:
                print "trying %s with args %s"%(func.__name__, args)
                func(*args, **kargs)
            except:
                traceback.print_exc()
            self.queue.task_done()

class ProcessPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.queue = multiprocessing.JoinableQueue()
        self.workerlist = []
        self.num = num_threads
        for i in range(num_threads):
            self.workerlist.append(Worker(self.queue, i))

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

    def start(self):
        for w in self.workerlist:
            w.start()

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.queue.join()
        for worker in self.workerlist:
            print worker.__dict__
            #worker.terminate()        <--- terminate used here  
            worker.join()              <--- join used here

start = time.time()

from sqlalchemy import *
from sqlalchemy.orm import *

dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()

for i in range(10):
    make_foo(i)

m.create_all()

def do(i, dbstring):
    dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
    db = create_engine(dbstring, echo=True)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
    Session.commit()

pool = ProcessPool(5)
for i in range(10):
    pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()

Ответы [ 2 ]

3 голосов
/ 25 января 2011

Вы знаете, многопроцессорная уже имеет классы для рабочих пулов, верно?

Стандартным способом является отправка сообщения темам о выходе:

queue.put(("QUIT", None, None))

Затем проверьте это:

if func == "QUIT":
    return
1 голос
/ 06 декабря 2017

Мой способ борьбы с этим был:

import multiprocessing

for prc in multiprocessing.active_children():
    prc.terminate()

Мне это нравится больше, поэтому мне не нужно загрязнять рабочую функцию некоторыми выражениями if.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...