сигналы или триггеры в SQLAlchemy - PullRequest
12 голосов
/ 03 августа 2009

Есть ли в SQLAlchemy что-то похожее на концепцию сигналов Django? По сути, я хотел бы вызвать несколько функций, когда я предварительно сохраняю или сохраняю некоторые объекты сущностей. Спасибо.

Редактировать: Я просто хочу эквивалент django-сигналов в SQLAlchemy.

Ответы [ 5 ]

9 голосов
/ 30 сентября 2012

Я думаю, что вы ищете "ORM Events". Вы можете найти документацию здесь:

http://docs.sqlalchemy.org/en/latest/orm/events.html

5 голосов
/ 03 августа 2009

Вы не уточнили, интегрируете ли вы SQLAlchemy и Django, или вы просто хотите получить эквивалент django-сигналов в SQLAlchemy.

Если вам нужен эквивалент сигналов Django, таких как post_save, pre_save, pre_delete и т. Д., Я бы рекомендовал вам страницу,

sqlalchemy.orm.interfaces.MapperExtension

2 голосов
/ 27 декабря 2010

Вот мой взгляд на эту проблему, она использует Луи для отправки сигналов:

dispatch.py

"""
Signals dispatching for SQLAlchemy mappers.
"""

import louie
from sqlalchemy.orm.interfaces import MapperExtension
import signals


class LouieDispatcherExtension(MapperExtension):
    """
    Dispatch signals using louie on insert, update and delete actions.
    """

    def after_insert(self, mapper, connection, instance):
        louie.send(signals.after_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_insert(mapper,
                connection, instance)

    def after_delete(self, mapper, connection, instance):
        louie.send(signals.after_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_delete(mapper,
                connection, instance)

    def after_update(self, mapper, connection, instance):
        louie.send(signals.after_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_update(mapper,
                connection, instance)

    def before_delete(self, mapper, connection, instance):
        louie.send(signals.before_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_delete(mapper,
                connection, instance)

    def before_insert(self, mapper, connection, instance):
        louie.send(signals.before_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_insert(mapper,
                connection, instance)

    def before_update(self, mapper, connection, instance):
        louie.send(signals.before_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_update(mapper,
                connection, instance)

signals.py

from louie import Signal


class after_delete(Signal): pass
class after_insert(Signal): pass
class after_update(Signal): pass
class before_delete(Signal): pass
class before_insert(Signal): pass
class before_update(Signal): pass

Пример использования:

class MyModel(DeclarativeBase):

    __mapper_args__ = {"extension": LouieDispatcherExtension()}

    ID = Column(Integer, primary_key=True)
    name = Column(String(255))

def on_insert(instance):
    print "inserted %s" % instance

louie.connect(on_insert, signals.after_insert, MyModel)
2 голосов
/ 03 августа 2009

Вы можете рассмотреть sqlalchemy.orm.SessionExtension , а также

Вот код, который я создал вместе, чтобы установить идентификатор владельца для экземпляра и установить update_date, чтобы получить работу, выполненную в приложении пилонов. класс OrmExt - это место, где происходит вся магия. И init_model - это место, где вы подключаете его.

import logging
import sqlalchemy as sa
from sqlalchemy import orm

from pylons import session

import datetime

log = logging.getLogger(__name__)

class ORMSecurityException(Exception):
    '''
    thrown for security violations in orm layer
    '''
    pass

def _get_current_user():
    log.debug('getting current user from session...')
    log.debug(session)
    return session['user'] 

def _is_admin(user):
    return False  


def set_update_date(instance):

    if hasattr(instance,'update_date'):
    instance.update_date = datetime.datetime.now()

def set_owner(instance):
    '''
    if owner_id, run it through the rules
    '''
    log.info('set_owner')
    if hasattr(instance, 'owner_id'):
    log.info('instance.owner_id=%s' % instance.owner_id)
    u = _get_current_user()
    log.debug('user: %s' % u.email)
    if not u:
        #anonymous users can't save owned objects
        raise ORMSecurityException()
    if instance.owner_id==None:
        #must be new object thus, owned by current user
        log.info('setting owner on object %s for user: %s' % (instance.__class__.__name__,u.email))
        instance.owner_id = u.id
    elif instance.owner_id!=u.id and not _is_admin(u):
        #if owner_id does not match user_id and user is not admin VIOLATION
        raise ORMSecurityException()
    else:
        log.info('object is already owned by this user')
        return #good to go
else:
    log.info('%s is not an owned object' % instance.__class__.__name__)
    return

def instance_policy(instance):
    log.info('setting owner for %s' % instance.__class__.__name__)
    set_owner(instance)
    log.info('setting update_date for %s' % instance.__class__.__name__)
    set_update_date(instance)


class ORMExt(orm.SessionExtension):
    '''
    attempt at managing ownership logic on objects
    '''
    def __init__(self,policy):
        self._policy = policy

    def before_flush(self,sqlsess,flush_context,instances):
        '''
        check all instances for owner_id==user.id
        '''
        try:
            for instance in sqlsess.deleted:
                try:
                    log.info('running policy for deleted %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.new:
                try:
                    log.info('running policy for new %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.dirty:
                try:
                    if sqlsess.is_modified(instance,include_collections=False,passive=True):
                        log.info('running policy for updated %s' % instance.__class__.__name__)
                        self._policy(instance)
                except Exception, ex:
                    log.error(ex)
                    raise ex

        except Exception,ex:
            sqlsess.expunge_all()
            raise ex

def init_model(engine):
    """Call me before using any of the tables or classes in the model"""
    sm = orm.sessionmaker(autoflush=True, autocommit=True, bind=engine,extension=ORMExt(instance_policy))
    meta.engine = engine
    meta.Session = orm.scoped_session(sm)
0 голосов
/ 19 декабря 2013

Вы можете использовать внутренний MapperExtension класс:

class YourModel(db.Model):

    class BaseExtension(MapperExtension):

        def before_insert(self, mapper, connection, instance):
            # do something here

        def before_update(self, mapper, connection, instance):
            # do something here

    __mapper_args__ = { 'extension': BaseExtension() }

    # ....
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...