Я пытаюсь использовать сигналы sqlalchemy для обновления индекса эластичного поиска, отражающего определенные модели в моей БД.
Проблема, с которой я сталкиваюсь, заключается в том, что при создании документов ES я получаю доступ к некоторым моделям 'отношения, и это означает, что в цикле сигналов есть несколько точек, к которым отношения не могут быть доступны.Я создавал документ ES из экземпляра модели в слушателях для after_(insert|udpate|delete)
, но они выдаются, когда соединение находится в состоянии flushing
, и кажется, что отношения возвращают None
в этот момент, поэтому документ ES не можетбыть построенным.
Я изменил его, поэтому я просто сохранил очередь команд, которые должны быть отправлены, вместо этого сохранив ссылку на модель, и планировалось выпустить их при фиксации, но сигнал before_commit
был выдан. до сброса, и когда выдается after_commit
, к БД больше нельзя получить доступ, поэтому снова отношения недоступны.
Мне кажется, что настал подходящий момент для создания Документов и их сохранения.находится в сигнале after_flush_postexec
, но у меня есть ощущение, что между этим моментом и моментом выдачи commit
может произойти откат, и тогда индекс ES больше не будет отражать DB.
IЯ не уверен, что лучший способ сделать это.
Вот код, с которым я сейчас работаю.ChargeIndexer.upsert
принимает модель SQLAlchemy ORM и использует API эластичного поиска для вставки / обновления созданного из него документа, а .delete
делает то же самое, за исключением, конечно, его удаления, и зависит только от id
. * 1023 модели.*
class ChargeListener(object):
ops = []
@event.listens_for(Charge, 'after_delete', propagate=True)
def after_delete(mapper, connection, target):
ChargeListener.add_delete(target)
@event.listens_for(Charge, 'after_insert', propagate=True)
def after_insert(mapper, connection, target):
ChargeListener.add_insert(target)
@event.listens_for(Charge, 'after_update', propagate=True)
def after_update(mapper, connection, target):
ChargeListener.add_insert(target)
@classmethod
def execute_ops(cls):
charges_indexer = ChargesIndexer()
for op, charge in ChargeListener.ops:
if op == 'insert':
charges_indexer.upsert(charge)
elif op == 'delete':
charges_indexer.delete(charge)
ChargeListener.reset()
@event.listens_for(sess, 'after_flush_postexec')
def after_flush_postexec(session, flush_context):
ChargeListener.execute_ops()
@event.listens_for(sess, 'after_soft_rollback')
def after_soft_rollback(session, previous_transaction):
ChargeListener.reset()
@classmethod
def add_insert(cls, charge):
cls.ops.append(('insert', charge))
@classmethod
def add_delete(cls, charge):
cls.ops.append(('delete', charge))
@classmethod
def reset(cls):
cls.ops = []