Зависать в скрипте Python с использованием SQLAlchemy и многопроцессорности - PullRequest
7 голосов
/ 09 января 2012

Рассмотрим следующий скрипт Python, который использует SQLAlchemy и модуль многопроцессорной обработки Python. Это с Python 2.6.6-8 + b1 (по умолчанию) и SQLAlchemy 0.6.3-3 (по умолчанию) в Debian squeeze. Это упрощенная версия некоторого фактического кода.

import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()

for i in range(10):
    make_foo(i)

m.create_all()

def do(kwargs):
    i, dbstring = kwargs['i'], kwargs['dbstring']

    db = create_engine(dbstring)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
    Session.commit()
    db.dispose()

pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()

Этот скрипт зависает со следующим сообщением об ошибке.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n        ^\n',))

Конечно, синтаксическая ошибка здесь TRUNCATE foo%s;. Мой вопрос: почему процесс зависает, и могу ли я убедить его завершиться с ошибкой, не делая серьезных операций с моим кодом? Это поведение очень похоже на мой настоящий код.

Обратите внимание, что зависание не происходит, если оператор заменяется чем-то вроде print foobarbaz. Кроме того, зависание все еще происходит, если мы заменим

Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()

просто Session.execute("TRUNCATE foo%s;")

Я использую предыдущую версию, потому что она ближе к тому, что делает мой настоящий код.

Кроме того, удаление multiprocessing с изображения и последовательное циклическое перемещение по таблицам устраняет зависание и просто завершается с ошибкой.

Я также немного озадачен формой ошибки, особенно битом TypeError: ('__init__() takes at least 4 arguments (2 given)'. Откуда эта ошибка? Кажется вероятным, что это где-то в multiprocessing коде.

Журналы PostgreSQL бесполезны. Я вижу много строк, таких как

2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR:  syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT:  COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;

но ничего более не актуального.

ОБНОВЛЕНИЕ 1: Благодаря lbolla и его проницательному анализу мне удалось подать отчет об ошибках Python по этому поводу. См. Анализ sbt в этом отчете, а также здесь . См. Также отчет об ошибке Python Исправление выбора исключений . Итак, следуя объяснениям sbt, мы можем воспроизвести исходную ошибку с

import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)

, что дает

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)

ОБНОВЛЕНИЕ 2: Это исправлено, по крайней мере для SQLAlchemy, Майком Байером, см. Отчет об ошибке Исключения StatementError не выбираются. . По предложению Майка, я также сообщил о подобной ошибке в psycopg2, хотя у меня не было (и нет) фактического примера поломки. Несмотря на это, они, по-видимому, исправили это, хотя они не дали подробностей исправления. См. psycopg исключения не могут быть засолены . В качестве примера, я также сообщил, что ошибка Python Исключения ConfigParser не могут быть изменены , что соответствует , о чем так и упоминается в вопросе . Кажется, они хотят проверить это.

В любом случае, похоже, что в обозримом будущем это будет оставаться проблемой, поскольку, по большому счету, разработчики Python, похоже, не знают об этой проблеме и поэтому не принимают меры против нее. Удивительно, но кажется, что людей, использующих многопроцессорность, недостаточно для того, чтобы это стало хорошо известной проблемой, или, может быть, они просто смирились с этим. Я надеюсь, что разработчики Python смогут исправить это хотя бы для Python 3, потому что это раздражает.

Я принял ответ Иболлы, так как без его объяснения того, как проблема связана с обработкой исключений, я бы, вероятно, никуда не пошел, чтобы понять это. Я также хочу поблагодарить sbt, который объяснил, что проблема заключалась в том, что Python не мог выбрать исключения. Я очень благодарен им обоим, и, пожалуйста, проголосуйте за их ответы. Спасибо.

ОБНОВЛЕНИЕ 3: я опубликовал следующий вопрос: Поймать необратимые исключения и повторно поднять .

Ответы [ 4 ]

11 голосов
/ 09 января 2012

Я полагаю, что TypeError взято из multiprocessing get.

Я удалил весь код БД из вашего скрипта.Взгляните на это:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Использование r.wait возвращает ожидаемый результат, но использование r.get повышает TypeError.Как описано в документах Python , используйте r.wait после map_async.

Edit : я должен изменить свой предыдущий ответ.Теперь я считаю, что TypeError происходит от SQLAlchemy.Я исправил свой сценарий, чтобы воспроизвести ошибку.

Редактировать 2 : похоже, проблема в том, что multiprocessing.pool не работает хорошо, если какой-либо работник вызывает исключение, конструктор которого требуетпараметр (см. также здесь ).

Я изменил свой скрипт, чтобы выделить это.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

В вашем случае, учитывая, что ваш код вызывает исключение SQLAlchemyЕдинственное решение, которое я могу придумать, - перехватить все исключения в функции do и вместо этого повторно повысить нормальный Exception.Примерно так:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Edit 3 : так что, похоже, это ошибка с Python , но правильные исключения в SQLAlchemy обойдут ее: следовательно,Я поднял проблему и с SQLAlchemy .

В качестве обходного пути я думаю, что решение в конце Edit 2 подойдет (обертывание обратных вызововв попытках, за исключением и повторного повышения).

2 голосов
/ 09 января 2012

Ошибка TypeError: ('__init__() takes at least 4 arguments (2 given) не связана с SQL, который вы пытаетесь выполнить, она связана с тем, как вы используете API SqlAlchemy.

Проблема в том, что вы пытаетесьвызовите execute в классе сеанса, а не в экземпляре этого сеанса.

Попробуйте это:

session = Session()
session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
session.commit()

Из документов :

Предполагается, что функция sessionmaker () будет вызываться в глобальной области приложения, а возвращаемый класс должен быть доступен для остальной части приложения как единый класс, используемый для создания экземпляров сеансов.

Таким образом, Session = sessionmaker() возвращает новый класс сеанса, а session = Session() возвращает экземпляр этого класса, который затем можно вызвать execute.

1 голос
/ 10 января 2012

(Это ответ на вопрос Фахима Митхи в комментарии о том, как использовать copy_reg для обхода неработающих классов исключений.)

Методы __init__() классов исключений SQLAlchemy, похоже, вызывают методы __init__() своего базового класса, но с другими аргументами. Это портит маринование.

Чтобы настроить выбор классов исключений sqlalchemy, вы можете использовать copy_reg , чтобы зарегистрировать свои собственные функции сокращения для этих классов.

Функция Reduce принимает аргумент obj и возвращает пару (callable_obj, args), так что можно создать копию obj, выполнив callable_obj(*args). Например

class StatementError(SQLAlchemyError):
    def __init__(self, message, statement, params, orig):
        SQLAlchemyError.__init__(self, message)
        self.statement = statement
        self.params = params
        self.orig = orig
    ...

можно «исправить», выполнив

import copy_reg, sqlalchemy.exc

def reduce_StatementError(e):
    message = e.args[0]
    args = (message, e.statement, e.params, e.orig)
    return (type(e), args)

copy_reg.pickle(sqlalchemy.exc.StatementError, reduce_StatementError)

В sqlalchemy.exc есть несколько других классов, которые необходимо исправить аналогичным образом. Но, надеюсь, вы поняли идею.


Если подумать, вместо того, чтобы исправлять каждый класс в отдельности, вы, вероятно, можете просто обезопасить метод __reduce__() базового класса исключений:

import sqlalchemy.exc

def rebuild_exc(cls, args, dic):
    e = Exception.__new__(cls)
    e.args = args
    e.__dict__.update(dic)
    return e

def __reduce__(e):
    return (rebuild_exc, (type(e), e.args, e.__dict__))

sqlalchemy.exc.SQLAlchemyError.__reduce__ = __reduce__
1 голос
/ 10 января 2012

Я не знаю о причине первоначального исключения.Однако проблемы многопроцессорной обработки с «плохими» исключениями на самом деле связаны с тем, как работает травление.Я думаю, что класс исключений sqlachemy не работает.

Если класс исключений имеет метод __init__(), который не вызывает BaseException.__init__() (прямо или косвенно), тогда self.args, вероятно, не будет установлен правильно.BaseException.__reduce__() (который используется протоколом рассола) предполагает, что копию исключения e можно воссоздать, просто выполнив

type(e)(*e.args)

Например

>>> e = ValueError("bad value")
>>> e
ValueError('bad value',)
>>> type(e)(*e.args)
ValueError('bad value',)

Если этоинвариант не удерживается, тогда травление / расслоение не удастся.Таким образом, экземпляры

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

могут быть засечены, но результат не может быть рассортирован:

>>> from cPickle import loads, dumps
>>> class BadExc(Exception):
...     def __init__(self, a):
...         '''Non-optional param in the constructor.'''
...         self.a = a
...
>>> loads(dumps(BadExc(1)))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: ('__init__() takes exactly 2 arguments (1 given)', <class '__main__.BadExc'>, ())

Но экземпляры

class GoodExc1(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        Exception.__init__(self, a)
        self.a = a

или

class GoodExc2(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.args = (a,)
        self.a = a

может быть успешно замаринован / не зарезан.

Поэтому вам следует попросить разработчиков sqlalchemy исправить их классы исключений.Тем временем вы, вероятно, можете использовать copy_reg.pickle() для переопределения BaseException.__reduce__() для проблемных классов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...