Использование SQLAlchemy scoped_session с Celery - PullRequest
0 голосов
/ 04 октября 2018

Я использую SQLAlchemy с Celery NOT в качестве бэкэнда (используемый мной бэкэнд Celery - RabbitMQ).

Чтение этого вопроса и SQLAlchemyдокументация Я придумываю это решение:

Сначала я реализовал класс, расширяющий класс Celery 'Task:

class DBTask(Task):
    _engine = None
    _Session = None

    def __init__(self):
        self.logger = get_task_logger('__name__')

    def after_return(self, *args, **kwargs):
        self.logger.info("DBTask: Removing session. {}".format(self._Session))
        if self._Session is not None:
            self.logger.debug("DBTask: Session removed. {}".format(self._Session))
            self._Session.remove()

    @property
    def engine(self):
        if self._engine is None:
            self.logger.info("DBTask: Creating engine.")
            self.config = GlobalConfigReader().get_config()
            self._engine = create_engine(self.config['postgres_uri'], pool_pre_ping=True)
        return self._engine

    @property
    def Session(self):
        if self._Session is None:
            self._Session = scoped_session(sessionmaker(bind=self.engine))
            self.logger.info("DBTask: Creating new session: {}".format(self._Session))
            Base.metadata.create_all(self.engine)
        return self._Session

И в одной из моих задач я делаю следующее:

self process(self, message):
        try:
            session = self.Session()

            db_alert = self._find_existing_alert(session, message['event'], 
            session.commit()
        except:
            self.logger.debug("Rolling back session. {}".format(session))
            session.rollback()
            raise
        # further processing
        context.delay(message)

Кажется, что каждая задача Celery, запущенная в потоке, создает новое соединение с БД и никогда не закрывает его.Это прометей-график состояния базы данных:

prometheus graph

Похоже, что соединения значительно увеличиваются в начале сельдерея, и они остаются несколько стабильными.(Падение связано с тем, что я перезапустил Postgres).

Еще одна вещь, которую я не могу объяснить, это то, почему, когда вызывается after_return, self_Session всегда равен None, поэтому self._Session.remove() никогда не вызывается.

[EDIT]: просматривая все журналы, я понял, что иногда self._Session.remove() - это не None, поэтому вызывается self._Session.remove(), но не каждый раз after_return.

...