Проблема
Я использую шаблон для кэширования объектов SQLAlchemy в Redis. Всякий раз, когда экземпляр изменяется и фиксируется, я хочу очистить соответствующие кэши, чтобы он был перезагружен при следующей выборке. Это очищение должно произойти после фиксации , чтобы избежать условий гонки (другой поток запрашивает кэш, пропускает и загружает устаревшие данные в кэш).
Я боролся с этим долгое время, приходя с различными решениями, которые иногда работают, но не пуленепробиваемые. Это кажется достаточно простой проблемой, которая должна быть решена. Как я запускаю некоторый код каждый раз, когда изменение фиксируется в экземпляре SQLAlchemy?
Что я пробовал
События
Я пытался соединить вместе некоторую SQLAlchemy события для достижения моей цели с переменным уровнем успеха. Прослушивание «after_insert» и «after_update» скажет мне, когда объект был изменен, а «after_commit» скажет мне, что все, что было изменено, было сохранено, поэтому у меня была схема, где первые два события регистрировали слушателей для «after_commit», что в свою очередь передаст объект моей функции очистки кэша. Например:
def _register_after_commit(_: Mapper, __: Connection, target: MyClass) -> None:
""" Generic callback that adds this function for a target change without params """
targets.add(target) # Clear cache uses this set to know which instances to clear
event.listen(get_session(), "after_commit", clear_cache)
event.listen(MyClass, "after_insert", _register_after_commit)
event.listen(MyClass, "after_update", _register_after_commit)
Это работает большую часть времени, но иногда я получаю DetachedInstanceError
при доступе к атрибутам на цели, которые мне нужно знать, чтобы очистить их из кэша (например, id
). Я читал, что это происходит из-за того, что во время коммита истекает автомат c, и SQLAlchemy хочет обновить все атрибуты sh. Я не могу отключить автоматическое истечение срока действия, и при этом я не могу удалить все объекты, которые здесь проходят, любой из них может привести к поломке других частей базы кода.
Пользовательский сеанс
I создал свой собственный сеансовый класс, который выглядел примерно так:
class SessionWithCallback(scoped_session):
""" A version of orm.Session which can call a method after commit completes """
def __init__(self, session_factory, scopefunc = None) -> None:
super().__init__(session_factory=session_factory, scopefunc=scopefunc)
self._callbacks = {}
def add_callback(self, func, *args, **kwargs) -> None:
"""
Adds a callback to be called after commit, ensuring only a single
instance of the callback for each set of args and kwargs is used
"""
key = f"{func}.{args}.{kwargs}"
self._callbacks[key] = (func, args, kwargs)
def run_callbacks(self) -> None:
"""
Executes all callbacks
"""
for (func, args, kwargs) in self._callbacks.values():
func(*args, **kwargs)
self._callbacks = {}
def commit(self) -> None:
""" Flush and commit the current transaction """
super().commit()
self.run_callbacks()
Тогда вместо _register_after_commit
с использованием события after_commit будет вызываться функция add_callback
текущего сеанса. Казалось, что это работает при запуске тестов только с SQLAlchemy, но разваливается при интеграции с приложением Flask, которое использует эти модели и использует Flask -SQLAlchemy. Я следовал инструкциям для настройки сеанса (переопределяя create_session
в экземпляре SQLAlchemy), но как только я что-то фиксирую, я получаю исключение, что scoped_session
не имеет атрибута add_callback
. Я прошел через него, и он каким-то образом использует мой класс, но сеанс, который он мне дает, не является экземпляром моего класса. Непонятно.
Я рассмотрел
- Сохранение первичных ключей в моих слушателях, а затем требование обратных вызовов для открытия сеанса и запроса самого нового экземпляра, если ему требуется дополнительная информация. Может работать, но мне кажется, что мне нужен дополнительный ввод / вывод, который мне не нужен. У меня может быть несколько разных обратных вызовов для одного экземпляра, все они требуют много работы.
- Наличие некоторого глобального места для хранения обратных вызовов вместо Session, так что я могу избежать функции
add_callback
. Мне все еще нужно сделать этот сеанс-специфицированный c и потокобезопасным, хотя. Это достаточно просто в Flask, но Flask - не единственное приложение, которому нужно делиться этим кодом. - Простое выполнение этих операций кеширования очищает вручную ... но это может вызвать ошибку разработчика.
- Создание некоторого отложенного по времени задания для очистки кэша из "after_insert / update". Это очень сложно, очень быстро и звучит как настоящая головная боль. Например, как вы решаете, как долго ждать?