Правильная обработка сеанса SQLAlchemy в многопоточных приложениях - PullRequest
21 голосов
/ 08 марта 2012

У меня проблемы с пониманием того, как правильно открывать и закрывать сеансы базы данных, как я понял из документации sqlalchemy, если я использую scoped_session для создания объекта Session, а затем использую возвращенный объект Session для создания сеансов, это потокобезопасно, поэтому в основном каждый поток получит свой собственный сеанс, и с ним проблем не будет. Теперь приведенный ниже пример работает, я помещаю его в бесконечный цикл, чтобы посмотреть, правильно ли он закрывает сеансы, и если я его правильно контролировал (в mysql, выполнив «SHOW PROCESSLIST;»), соединения просто продолжают расти, он не закрывает их , хотя я использовал session.close () и даже удаляю объект scoped_session в конце каждого запуска. Что я делаю неправильно? Моя цель в более крупном приложении - использовать минимальное количество требуемых соединений с базой данных, потому что моя текущая рабочая реализация создает новый сеанс в каждом методе, где он требуется, и закрывает его перед возвратом, что кажется неэффективным.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel


DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
        self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
        self.DBSession = scoped_session(
            sessionmaker(
                autoflush=True,
                autocommit=False,
                bind=self.db_engine
            )
        )

    def _worker(self):
        db_session = self.DBSession()
        while True:
            try:
                task_id = self.task_queue.get(False)
                try:
                    item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
                    # do something with item
                except Exception as exc:
                    # if an error occurrs we skip it
                    continue

                finally:
                    db_session.commit()
                    self.task_queue.task_done()
            except QueueEmpty:
                db_session.close()
                return

    def start(self):
        try:
            db_session = self.DBSession()
            all_items = db_session.query(MyModel).all()
            for item in all_items:
                self.task_queue.put(item.id)

            for _i in range(self.worker_count):
                t = Thread(target=self._worker)
                t.start()

            self.task_queue.join()
        finally:
            db_session.close()
            self.DBSession.remove()


if __name__ == '__main__':
    while True:
        mt_worker = MTWorker(worker_count=50)
        mt_worker.start()

1 Ответ

38 голосов
/ 08 марта 2012

Вы должны звонить create_engine и scoped_session только один раз процесс (на базу данных). Каждый получит свой пул соединений или сеансов (соответственно), поэтому вы хотите убедиться, что вы создаете только один пул. Просто сделайте это глобальным уровнем модуля. если вам нужно более точно управлять сессиями, вам, вероятно, не следует использовать scoped_session

Еще одно изменение - использование DBSession напрямую, как если бы оно было сессия. вызов методов сеанса в scoped_session будет прозрачно при необходимости создайте локальный сеанс потока и перенаправьте вызов метода сессия.

Другая вещь, о которой следует знать, это pool_size из пула соединений, который по умолчанию 5 Для многих приложений это нормально, но если вы создаете много потоков, вам может понадобиться настроить этот параметр

DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
    sessionmaker(
        autoflush=True,
        autocommit=False,
        bind=db_engine
    )
)


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
# snip
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...