Я сейчас пытаюсь создать приложение, которое обрабатывает HTTP-запросы с помощью API REST. Приложение использует Flask для веб-фреймворка и использует Celery для асинхронных задач.
Вот структура приложения.
app.py
celery_app.py
controller/
controller.py
task/
task.py
(1) app.py
Нет строк для конфигурации Celery.
from flask import Flask
...
app = Flask(__name__)
...
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
(2) celery.py
from consts.consts import Consts
from kombu.utils.url import quote
from celery import Celery
from config.config import Config
config = Config()
db_uri = 'db+' + config.get_db_uri()
aws_access_key = quote(config.get_config(Consts.AWS, Consts.AWS_ACCESS_KEY))
aws_secret_key = quote(config.get_config(Consts.AWS, Consts.AWS_SECRET_KEY))
broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=aws_access_key, aws_secret_key=aws_secret_key
)
celery = Celery(
broker=broker_url,
backend=db_uri,
include=['task']
)
(3) task / task.py
Я положил все задачи здесь.
from celery_app import celery
from flask import current_app as app
from model.model import db
@celery.task
def check_friend_status_from_db(user1, user2):
status = db.engine.execute(
"QUERY").fetchone()
return status
Теперь файл controller/controller.py
импортирует и вызывает задачи следующим образом.
(4) controller / controller.py
from flask import Blueprint, request, json, render_template, jsonify
from mysql.connector import Error
from sqlalchemy import or_, and_
from consts.consts import StatusCode
from consts.consts import Consts
from model.model import db, FriendRequest
from outbound.request import Request
from util.logging import logger
from task.task import check_friend_status_from_db
controller_blueprint = Blueprint('controller_blueprint', __name__)
outbound_request = Request()
@controller_blueprint.route('/friend/status/<requested_from>/<requested_to>', methods=['GET'])
def check_friend_status(requested_from, requested_to):
logger.info('Checking the friend status')
try:
status = check_friend_status_from_db.apply_async((requested_from, requested_to)).get()
if status is None:
response = {
Consts.STATUS_CODE: StatusCode.OK,
Consts.FRIEND_STATUS: StatusCode.NO_RELATION
}
else:
response = {
Consts.STATUS_CODE: StatusCode.OK,
Consts.FRIEND_STATUS: status
}
except Error as e:
logger.error("TypeError:", e)
response = {
Consts.STATUS_CODE: StatusCode.ERROR
}
json_response = jsonify(response)
logger.info(json_response)
return json_response
Когда я запускаю код, я получаю ошибку, как я упоминал в заголовке.
RuntimeError: No application found. Either work inside a view function or push an application context
, и оказывается, что эта часть находится под блоком try в контроллере Откуда исходит ошибка?
status = check_friend_status_from_db.apply_async((requested_from, requested_to)).get()
Есть какие-нибудь решения, пожалуйста?