Ниже приведен пример использования многопроцессорности. Это модель пула процессов. Это не так просто, как могло бы быть, но относительно близко по структуре к коду, который я на самом деле использую. Он также использует sqlalchemy, извините.
Мой вопрос - у меня в настоящее время есть ситуация, когда у меня есть относительно долго работающий скрипт Python, который выполняет ряд функций, каждая из которых похожа на приведенный ниже код, поэтому родительский процесс одинаков во всех случаях. Другими словами, несколько пулов создаются одним скриптом Python. (Полагаю, я не должен делать это таким образом, но альтернатива состоит в том, чтобы использовать что-то вроде os.system и subprocess.) Проблема в том, что эти процессы зависают и держатся за память. В документах говорится, что эти процессы-демоны должны оставаться до завершения родительского процесса, но что если родительский процесс продолжит генерировать другой пул или процессы и не завершится немедленно.
Вызов terminate () работает, но это не выглядит вежливо. Есть ли хороший способ попросить процессы прекратить красиво? То есть убери за собой и уходи сейчас, мне нужно запустить следующий бассейн?
Я также попытался вызвать join () для процессов. Согласно документации это означает ожидание завершения процессов. Что делать, если они не планируют прекратить? На самом деле происходит зависание процесса.
Заранее спасибо.
С уважением, Фахим.
import multiprocessing, time
class Worker(multiprocessing.Process):
"""Process executing tasks from a given tasks queue"""
def __init__(self, queue, num):
multiprocessing.Process.__init__(self)
self.num = num
self.queue = queue
self.daemon = True
def run(self):
import traceback
while True:
func, args, kargs = self.queue.get()
try:
print "trying %s with args %s"%(func.__name__, args)
func(*args, **kargs)
except:
traceback.print_exc()
self.queue.task_done()
class ProcessPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.queue = multiprocessing.JoinableQueue()
self.workerlist = []
self.num = num_threads
for i in range(num_threads):
self.workerlist.append(Worker(self.queue, i))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.queue.put((func, args, kargs))
def start(self):
for w in self.workerlist:
w.start()
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.queue.join()
for worker in self.workerlist:
print worker.__dict__
#worker.terminate() <--- terminate used here
worker.join() <--- join used here
start = time.time()
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)
def make_foo(i):
t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))
conn = db.connect()
for i in range(10):
conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
for i in range(10):
make_foo(i)
m.create_all()
def do(i, dbstring):
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
Session.commit()
pool = ProcessPool(5)
for i in range(10):
pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()