Когда мое приложение Flask получает запрос, оно порождает объект, который добавляет сообщения в очередь и планирует задание опроса очереди в другом потоке.Каждые две секунды очередь опрашивается на предмет отправки сообщений.Некоторые сообщения требуют запроса к базе данных.В файле app.py цикл while проверяет, остались ли сообщения в очереди, и спит, пока в очереди нет сообщений, а затем фиксирует.
Проблема заключается в том, что это вызывает ошибку ограничения QueuePool.Я проверил свою базу данных PostGreSQL, используя SELECT * FROM pg_stat_activity
, и запросы, сделанные в заданиях опроса очереди, бездействуют в транзакции.Я проверил, и scoped_session всегда используется один и тот же.Коммит также действительно вызывается, когда очередь пуста.Так в чем же проблема?
Обратите внимание, что я использую APScheduler для планирования.
Я знаю, что эта реализация не имеет смысла: я не хотел вызывать time.sleep ()между сообщениями, но не осознал, что мне все равно нужно заблокировать основной поток, чтобы сработал коммитНо проблема остается законной независимо от того, насколько опасна реализация.
Простой пример:
application.py
import time
from config import FlaskDatabaseConfig
from queueing import Queue
from flask import Flask
from flask import request
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
application = Flask(__name__)
application.config.from_object(FlaskDatabaseConfig())
db = SQLAlchemy(application)
application.route("/", methods=['GET'])
def handle_req():
queue = Queue()
for i in range(3):
queue.add_message("Test", 1)
while len(queue.messages) > 0:
time.sleep(1)
db.session.commit()
# run the app.
if __name__ == "__main__":
application.run(debug=False, port=5001)
queueing.py
from application import db
from models import User
from apscheduler.schedulers.background import BlockingScheduler, BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
class Queue():
def __init__(self):
self.messages = []
self.job = None
def add_message(self, message, recipient_index):
self.messages.append((message, recipient_index))
if not self.job:
scheduler = BackgroundScheduler()
scheduler.start()
job = scheduler.add_job(
func=self.poll_queue,
trigger=IntervalTrigger(
seconds=2
),
id='poll_queue',
name='The queue is polled every 2 seconds for sending messages',
replace_existing=True,
misfire_grace_time=10)
self.job = job
def poll_queue(self):
if len(self.messages)>0:
msg, recipient_index = self.messages.pop(0)
recipient = User.query.get(recipient_index)
self.send_message(msg, recipient)
else:
self.job.remove()
def send_message(self):
pass
config.py
class FlaskDatabaseConfig(object):
def __init__(self):
"""Configuration object for the database."""
self.POSTGRES_USER = os.environ["RDS_USERNAME"]
self.POSTGRES_PW = os.environ["RDS_PASSWORD"]
self.POSTGRES_URL = os.environ["RDS_HOSTNAME"]
self.POSTGRES_DB = os.environ["RDS_DB_NAME"]
self.SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://{user}:{pw}@{url}/{db}'.format(user=self.POSTGRES_USER,
pw=self.POSTGRES_PW,
url=self.POSTGRES_URL,
db=self.POSTGRES_DB)
models.py
from application import db
class User(db.Model):
index= db.Column(db.Integer(), autoincrement=True, primary_key=True, unique=True)