Индексирование моделей SQLAlchemy в ElasticSearch - PullRequest
0 голосов
/ 14 мая 2019

Я пытаюсь использовать сигналы sqlalchemy для обновления индекса эластичного поиска, отражающего определенные модели в моей БД.

Проблема, с которой я сталкиваюсь, заключается в том, что при создании документов ES я получаю доступ к некоторым моделям 'отношения, и это означает, что в цикле сигналов есть несколько точек, к которым отношения не могут быть доступны.Я создавал документ ES из экземпляра модели в слушателях для after_(insert|udpate|delete), но они выдаются, когда соединение находится в состоянии flushing, и кажется, что отношения возвращают None в этот момент, поэтому документ ES не можетбыть построенным.

Я изменил его, поэтому я просто сохранил очередь команд, которые должны быть отправлены, вместо этого сохранив ссылку на модель, и планировалось выпустить их при фиксации, но сигнал before_commit был выдан. до сброса, и когда выдается after_commit, к БД больше нельзя получить доступ, поэтому снова отношения недоступны.

Мне кажется, что настал подходящий момент для создания Документов и их сохранения.находится в сигнале after_flush_postexec, но у меня есть ощущение, что между этим моментом и моментом выдачи commit может произойти откат, и тогда индекс ES больше не будет отражать DB.

IЯ не уверен, что лучший способ сделать это.

Вот код, с которым я сейчас работаю.ChargeIndexer.upsert принимает модель SQLAlchemy ORM и использует API эластичного поиска для вставки / обновления созданного из него документа, а .delete делает то же самое, за исключением, конечно, его удаления, и зависит только от id. * 1023 модели.*

class ChargeListener(object):

    ops = []

    @event.listens_for(Charge, 'after_delete', propagate=True)
    def after_delete(mapper, connection, target):
        ChargeListener.add_delete(target)

    @event.listens_for(Charge, 'after_insert', propagate=True)
    def after_insert(mapper, connection, target):
        ChargeListener.add_insert(target)

    @event.listens_for(Charge, 'after_update', propagate=True)
    def after_update(mapper, connection, target):
        ChargeListener.add_insert(target)

    @classmethod
    def execute_ops(cls):
        charges_indexer = ChargesIndexer()
        for op, charge in ChargeListener.ops:
            if op == 'insert':
                charges_indexer.upsert(charge)
            elif op == 'delete':
                charges_indexer.delete(charge)

        ChargeListener.reset()

    @event.listens_for(sess, 'after_flush_postexec')
    def after_flush_postexec(session, flush_context):
        ChargeListener.execute_ops()

    @event.listens_for(sess, 'after_soft_rollback')
    def after_soft_rollback(session, previous_transaction):
        ChargeListener.reset()

    @classmethod
    def add_insert(cls, charge):
        cls.ops.append(('insert', charge))

    @classmethod
    def add_delete(cls, charge):
        cls.ops.append(('delete', charge))

    @classmethod
    def reset(cls):
        cls.ops = []
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...