Ограничение Flask-SQLAlchemy QueuePool с запросами, выполненными в отдельном потоке - PullRequest
0 голосов
/ 28 января 2019

Когда мое приложение Flask получает запрос, оно порождает объект, который добавляет сообщения в очередь и планирует задание опроса очереди в другом потоке.Каждые две секунды очередь опрашивается на предмет отправки сообщений.Некоторые сообщения требуют запроса к базе данных.В файле app.py цикл while проверяет, остались ли сообщения в очереди, и спит, пока в очереди нет сообщений, а затем фиксирует.

Проблема заключается в том, что это вызывает ошибку ограничения QueuePool.Я проверил свою базу данных PostGreSQL, используя SELECT * FROM pg_stat_activity, и запросы, сделанные в заданиях опроса очереди, бездействуют в транзакции.Я проверил, и scoped_session всегда используется один и тот же.Коммит также действительно вызывается, когда очередь пуста.Так в чем же проблема?

Обратите внимание, что я использую APScheduler для планирования.

Я знаю, что эта реализация не имеет смысла: я не хотел вызывать time.sleep ()между сообщениями, но не осознал, что мне все равно нужно заблокировать основной поток, чтобы сработал коммитНо проблема остается законной независимо от того, насколько опасна реализация.

Простой пример:

application.py

import time
from config import FlaskDatabaseConfig
from queueing import Queue
from flask import Flask
from flask import request
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
application = Flask(__name__)
application.config.from_object(FlaskDatabaseConfig())
db = SQLAlchemy(application)

application.route("/", methods=['GET'])
def handle_req():
    queue = Queue()
    for i in range(3):
         queue.add_message("Test", 1)
    while len(queue.messages) > 0:
         time.sleep(1)
    db.session.commit()

# run the app.
if __name__ == "__main__":
    application.run(debug=False, port=5001)

queueing.py

from application import db
from models import User
from apscheduler.schedulers.background import BlockingScheduler, BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger

class Queue():
    def __init__(self):
        self.messages =  []
        self.job = None

    def add_message(self, message, recipient_index):
        self.messages.append((message, recipient_index))
        if not self.job:
            scheduler = BackgroundScheduler()
            scheduler.start()
            job = scheduler.add_job(
                func=self.poll_queue,
                trigger=IntervalTrigger(
                    seconds=2
                ),
                id='poll_queue',
                name='The queue is polled every 2 seconds for sending messages',
                replace_existing=True,
                misfire_grace_time=10)
           self.job = job

    def poll_queue(self):
         if len(self.messages)>0:
             msg, recipient_index = self.messages.pop(0)
             recipient = User.query.get(recipient_index)
             self.send_message(msg, recipient)
         else:
             self.job.remove()

    def send_message(self):
        pass

config.py

class FlaskDatabaseConfig(object):
    def __init__(self):
        """Configuration object for the database."""
        self.POSTGRES_USER = os.environ["RDS_USERNAME"]
        self.POSTGRES_PW = os.environ["RDS_PASSWORD"]
        self.POSTGRES_URL = os.environ["RDS_HOSTNAME"]
        self.POSTGRES_DB = os.environ["RDS_DB_NAME"]

        self.SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://{user}:{pw}@{url}/{db}'.format(user=self.POSTGRES_USER,
                                                                                    pw=self.POSTGRES_PW,
                                                                                    url=self.POSTGRES_URL,
                                                                                    db=self.POSTGRES_DB)

models.py

from application import db
class User(db.Model):
    index= db.Column(db.Integer(), autoincrement=True, primary_key=True, unique=True)
...