У меня немного необычная проблема с состоянием транзакции и обработкой ошибок в SQLAlchemy.Короткая версия: есть ли способ сохранить транзакцию, когда SQLAlchemy вызывает ProgrammingError
и прерывает ее?
Фон
Я работаю над интеграционным тестомнабор для унаследованной кодовой базы.Сейчас я разрабатываю набор приспособлений, которые позволят нам запускать все тесты внутри транзакций, вдохновленные документацией SQLAlchemy .Общая парадигма включает в себя открытие соединения, запуск транзакции, привязку сеанса к этому соединению, а затем моделирование большинства методов доступа к базе данных, чтобы они использовали эту транзакцию.(Чтобы понять, как это выглядит, см. Код, приведенный в ссылке на документацию выше, включая примечание в конце.) Цель состоит в том, чтобы позволить себе запускать методы из кодовой базы, которые выполняют много обновлений базы данных вконтекст теста, с гарантией того, что любые побочные эффекты, которые могут изменить базу данных теста, будут отменены после завершения теста.
Моя проблема заключается в том, что код часто полагается на обработку ошибок DBAPI для достижения контроляпоток при выполнении запросов, и эти ошибки автоматически прерывают транзакции (за документацию psycopg2 ).Это создает проблему, поскольку мне нужно сохранить работу, которая была выполнена в этой транзакции, до момента возникновения ошибки, и мне нужно продолжать использовать транзакцию после обработки ошибки.
Вот типичный метод, который использует обработку ошибок для потока управления:
from api.database import engine
def entity_count():
"""
Count the entities in a project.
"""
get_count = '''
SELECT COUNT(*) AS entity_count FROM entity_browser
'''
with engine.begin() as conn:
try:
count = conn.execute(count).first().entity_count
except ProgrammingError:
count = 0
return count
В этом примере обработка ошибок обеспечивает быстрый способ определения, существует ли таблица entity_browser
: если нет, Postgres выдаст ошибкуэто ловится на уровне DBAPI (psycopg2) и передается в SQLAlchemy как ProgrammingError
.
. В тестах я макетирую engine.begin()
, чтобы он всегда возвращал соединение с текущей транзакцией, которая былаустановлено в тестовой настройке.К сожалению, это означает, что когда код продолжает выполнение после того, как SQLAlchemy поднял ProgrammingError
и psycopg2 прервал транзакцию, SQLAlchemy вызовет InternalError
в следующий раз, когда запрос к базе данных будет выполнен с использованием открытого соединения, жалуясь, что транзакция былапрервано.
Вот пример теста, демонстрирующий такое поведение:
import sqlalchemy as sa
def test_entity_count(session):
"""
Test the `entity_count` method.
`session` is a fixture that sets up the transaction and mocks out
database access, returning a Flask-SQLAlchemy `scoped_session` object
that we can use for queries.
"""
# Make a change to a table that we can observe later
session.execute('''
UPDATE users
SET name = 'in a test transaction'
WHERE id = 1
''')
# Drop `entity_browser` in order to raise a `ProgrammingError` later
session.execute('''DROP TABLE entity_browser''')
# Run the `entity_count` method, making sure that it raises an error
with pytest.raises(sa.exc.ProgrammingError):
count = entity_count()
assert count == 0
# Make sure that the changes we made earlier in the test still exist
altered_name = session.execute('''
SELECT name
FROM users
WHERE id = 1
''')
assert altered_name == 'in a test transaction'
Вот тип вывода, который я получаю:
> altered_name = session.execute('''
SELECT name
FROM users
WHERE id = 1
''')
[... traceback history...]
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlalchemy.exc.InternalError: (psycopg2.InternalError) current transaction is
aborted, commands ignored until end of transaction block
Попытки решения
Моим первым инстинктом было попытаться прервать обработку ошибок и принудительно выполнить откат, используя handle_error
слушатель событий SQLAlchemy.Я добавил слушателя в тестовое устройство, которое будет откатывать необработанное соединение (поскольку, насколько я понимаю, экземпляры SQLAlchemy Connection
не имеют API отката):
@sa.event.listens_for(connection, 'handle_error')
def raise_error(context):
dbapi_conn = context.connection.connection
dbapi_conn.rollback()
Это успешно сохраняет транзакцию открытой.для дальнейшего использования, но в итоге откатывает все предыдущие изменения, сделанные в тесте.Пример вывода:
> assert altered_name == 'in a test transaction'
E AssertionError
Очевидно, что откат необработанного соединения слишком агрессивен для подхода.Думая, что мне удастся откатиться до последней точки сохранения, я попытался откатить сеанс с заданной областью, к которому подключен прослушиватель событий, который автоматически открывает новую вложенную транзакцию по завершении предыдущей.(См. Примечание в конце документа по SQLAlchemy о транзакциях в тестах , где приведен пример того, как это выглядит.)
Благодаря макетам, настроенным в приспособлении session
,Я могу импортировать сеанс с областью действия непосредственно в прослушиватель событий и откатить его обратно:
@sa.event.listens_for(connection, 'handle_error')
def raise_error(context):
from api.database import db
db.session.rollback()
Однако этот подход также вызывает InternalError
при следующем запросе.Похоже, что на самом деле он не откатывает транзакцию до удовлетворения основного курсора.
Краткий вопрос
Есть ли способ сохранить транзакцию после того, как ProgrammingError
будет поднят?На более абстрактном уровне, что происходит, когда psycopg2 «прерывает» транзакцию, и как я могу ее обойти?