Я использую SQLAlchemy с Celery NOT в качестве бэкэнда (используемый мной бэкэнд Celery - RabbitMQ).
Чтение этого вопроса и SQLAlchemyдокументация Я придумываю это решение:
Сначала я реализовал класс, расширяющий класс Celery 'Task
:
class DBTask(Task):
_engine = None
_Session = None
def __init__(self):
self.logger = get_task_logger('__name__')
def after_return(self, *args, **kwargs):
self.logger.info("DBTask: Removing session. {}".format(self._Session))
if self._Session is not None:
self.logger.debug("DBTask: Session removed. {}".format(self._Session))
self._Session.remove()
@property
def engine(self):
if self._engine is None:
self.logger.info("DBTask: Creating engine.")
self.config = GlobalConfigReader().get_config()
self._engine = create_engine(self.config['postgres_uri'], pool_pre_ping=True)
return self._engine
@property
def Session(self):
if self._Session is None:
self._Session = scoped_session(sessionmaker(bind=self.engine))
self.logger.info("DBTask: Creating new session: {}".format(self._Session))
Base.metadata.create_all(self.engine)
return self._Session
И в одной из моих задач я делаю следующее:
self process(self, message):
try:
session = self.Session()
db_alert = self._find_existing_alert(session, message['event'],
session.commit()
except:
self.logger.debug("Rolling back session. {}".format(session))
session.rollback()
raise
# further processing
context.delay(message)
Кажется, что каждая задача Celery, запущенная в потоке, создает новое соединение с БД и никогда не закрывает его.Это прометей-график состояния базы данных:
![prometheus graph](https://i.stack.imgur.com/Pqqxv.png)
Похоже, что соединения значительно увеличиваются в начале сельдерея, и они остаются несколько стабильными.(Падение связано с тем, что я перезапустил Postgres).
Еще одна вещь, которую я не могу объяснить, это то, почему, когда вызывается after_return
, self_Session
всегда равен None, поэтому self._Session.remove()
никогда не вызывается.
[EDIT]: просматривая все журналы, я понял, что иногда self._Session.remove()
- это не None, поэтому вызывается self._Session.remove()
, но не каждый раз after_return
.