Сохранение прерванной транзакции, когда SQLAlchemy вызывает ProgrammingError - PullRequest
0 голосов
/ 25 апреля 2018

У меня немного необычная проблема с состоянием транзакции и обработкой ошибок в SQLAlchemy.Короткая версия: есть ли способ сохранить транзакцию, когда SQLAlchemy вызывает ProgrammingError и прерывает ее?

Фон

Я работаю над интеграционным тестомнабор для унаследованной кодовой базы.Сейчас я разрабатываю набор приспособлений, которые позволят нам запускать все тесты внутри транзакций, вдохновленные документацией SQLAlchemy .Общая парадигма включает в себя открытие соединения, запуск транзакции, привязку сеанса к этому соединению, а затем моделирование большинства методов доступа к базе данных, чтобы они использовали эту транзакцию.(Чтобы понять, как это выглядит, см. Код, приведенный в ссылке на документацию выше, включая примечание в конце.) Цель состоит в том, чтобы позволить себе запускать методы из кодовой базы, которые выполняют много обновлений базы данных вконтекст теста, с гарантией того, что любые побочные эффекты, которые могут изменить базу данных теста, будут отменены после завершения теста.

Моя проблема заключается в том, что код часто полагается на обработку ошибок DBAPI для достижения контроляпоток при выполнении запросов, и эти ошибки автоматически прерывают транзакции (за документацию psycopg2 ).Это создает проблему, поскольку мне нужно сохранить работу, которая была выполнена в этой транзакции, до момента возникновения ошибки, и мне нужно продолжать использовать транзакцию после обработки ошибки.

Вот типичный метод, который использует обработку ошибок для потока управления:

from api.database import engine

def entity_count(): 
    """
    Count the entities in a project.
    """

    get_count = ''' 
        SELECT COUNT(*) AS entity_count FROM entity_browser 
    ''' 

    with engine.begin() as conn:
        try: 
            count = conn.execute(count).first().entity_count 
        except ProgrammingError: 
            count = 0 

return count 

В этом примере обработка ошибок обеспечивает быстрый способ определения, существует ли таблица entity_browser: если нет, Postgres выдаст ошибкуэто ловится на уровне DBAPI (psycopg2) и передается в SQLAlchemy как ProgrammingError.

. В тестах я макетирую engine.begin(), чтобы он всегда возвращал соединение с текущей транзакцией, которая былаустановлено в тестовой настройке.К сожалению, это означает, что когда код продолжает выполнение после того, как SQLAlchemy поднял ProgrammingError и psycopg2 прервал транзакцию, SQLAlchemy вызовет InternalError в следующий раз, когда запрос к базе данных будет выполнен с использованием открытого соединения, жалуясь, что транзакция былапрервано.

Вот пример теста, демонстрирующий такое поведение:

import sqlalchemy as sa

def test_entity_count(session):
    """
    Test the `entity_count` method.

    `session` is a fixture that sets up the transaction and mocks out
    database access, returning a Flask-SQLAlchemy `scoped_session` object
    that we can use for queries.
    """

    # Make a change to a table that we can observe later
    session.execute('''
        UPDATE users
        SET name = 'in a test transaction'
        WHERE id = 1
    ''')

    # Drop `entity_browser` in order to raise a `ProgrammingError` later
    session.execute('''DROP TABLE entity_browser''')

    # Run the `entity_count` method, making sure that it raises an error
    with pytest.raises(sa.exc.ProgrammingError):
        count = entity_count()

    assert count == 0

    # Make sure that the changes we made earlier in the test still exist
    altered_name = session.execute('''
        SELECT name
        FROM users
        WHERE id = 1
    ''')

    assert altered_name == 'in a test transaction'

Вот тип вывода, который я получаю:

> altered_name = session.execute('''
      SELECT name
      FROM users
      WHERE id = 1
  ''')

[... traceback history...]

def do_execute(self, cursor, statement, parameters, context=None):
>   cursor.execute(statement, parameters)
E   sqlalchemy.exc.InternalError: (psycopg2.InternalError) current transaction is
    aborted, commands ignored until end of transaction block

Попытки решения

Моим первым инстинктом было попытаться прервать обработку ошибок и принудительно выполнить откат, используя handle_error слушатель событий SQLAlchemy.Я добавил слушателя в тестовое устройство, которое будет откатывать необработанное соединение (поскольку, насколько я понимаю, экземпляры SQLAlchemy Connection не имеют API отката):

@sa.event.listens_for(connection, 'handle_error')
def raise_error(context):
    dbapi_conn = context.connection.connection
    dbapi_conn.rollback()

Это успешно сохраняет транзакцию открытой.для дальнейшего использования, но в итоге откатывает все предыдущие изменения, сделанные в тесте.Пример вывода:

> assert altered_name == 'in a test transaction'
E AssertionError

Очевидно, что откат необработанного соединения слишком агрессивен для подхода.Думая, что мне удастся откатиться до последней точки сохранения, я попытался откатить сеанс с заданной областью, к которому подключен прослушиватель событий, который автоматически открывает новую вложенную транзакцию по завершении предыдущей.(См. Примечание в конце документа по SQLAlchemy о транзакциях в тестах , где приведен пример того, как это выглядит.)

Благодаря макетам, настроенным в приспособлении session,Я могу импортировать сеанс с областью действия непосредственно в прослушиватель событий и откатить его обратно:

@sa.event.listens_for(connection, 'handle_error')
def raise_error(context):
    from api.database import db
    db.session.rollback()

Однако этот подход также вызывает InternalError при следующем запросе.Похоже, что на самом деле он не откатывает транзакцию до удовлетворения основного курсора.

Краткий вопрос

Есть ли способ сохранить транзакцию после того, как ProgrammingError будет поднят?На более абстрактном уровне, что происходит, когда psycopg2 «прерывает» транзакцию, и как я могу ее обойти?

1 Ответ

0 голосов
/ 26 апреля 2018

Корень проблемы в том, что вы скрываете исключение от диспетчера контекста.Вы ловите ProgrammingError слишком рано, и оператор with никогда его не увидит.Ваш entity_count() должен быть:

def entity_count(): 
    """
    Count the entities in a project.
    """

    get_count = ''' 
        SELECT COUNT(*) AS entity_count FROM entity_browser 
    ''' 

    try:
        with engine.begin() as conn:
            count = conn.execute(get_count).first().entity_count

    except ProgrammingError: 
        count = 0 

return count

И затем, если вы предоставите что-то вроде

@contextmanager     
def fake_begin():
    """ Begin a nested transaction and yield the global connection.
    """
    with connection.begin_nested(): 
        yield connection

в качестве проверяемого engine.begin(), соединение останется пригодным для использования.Но @ JL Peyret поднимает вопрос о логике вашего теста.Engine.begin() обычно 1 обеспечивает новое соединение с вооруженной транзакцией из пула, поэтому ваши session и entity_count(), вероятно, даже не должны использовать одно и то же соединение.

1 : зависит от конфигурации пула.

...