Python Flask Контекст приложения, как открыть приложение для других модулей - PullRequest
0 голосов
/ 23 апреля 2020

У меня есть скрипт python, который использует Flask -SqlAlchemy для доступа к базе данных Postgres. Однако всякий раз, когда я пытаюсь выполнить запрос к базе данных, я получаю сообщение об ошибке «вне контекста». Я решил, что способ сделать это - обернуть его в app.app_context:

import psycopg2
import json
from simple_chalk import redBright
from ...models.bin import Bin
from ...models.station import Station
from ... import db
from datetime import datetime as dt
from ... import current_app as app

def findPositionBin(stationId, find_position):
    try:
        with app.app_context():
            result = Bin.query.filter_by(station_id=stationId).filter_by(position=find_position).first()
            print("result")
            return result
    except Exception as e:
        print(redBright(e))

Однако для этого мне нужно будет импортировать приложение. Проблема в том, что у моего root init .py есть приложение, содержащееся в функции, вызываемой wsgi.py для запуска программы.

init .py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_socketio import SocketIO
from flask_cors import CORS
import eventlet
import threading


db = SQLAlchemy()
migrate = Migrate()
socketio = SocketIO()


def create_app():
    app = Flask(__name__, instance_relative_config=False)
    CORS(app)
    app.config.from_object('config.Config')
    eventlet.monkey_patch()
    socketio.init_app(app, cors_allowed_origins='*', async_mode='eventlet')
    migrate.init_app(app, db)

    with app.app_context():
        from . import routes
        from . import wsroutes
        from .models import user, bin, ip_port, station
        from .blueprints import user
        from .blueprints.CubeStation import station_routes
        from.database.CubeStation import station_database
        from .server import startServer
        from .blueprints.CubeStation.station_functions import broadcastLoop
        # from .database.CubeStation import station_database
        db.init_app(app)
        app.register_blueprint(user.user_blueprint)
        app.register_blueprint(station_routes.station_blueprint)
        # app.register_blueprint(station_database.database_blueprint)
        x = threading.Thread(target=startServer)
        x.start()
        t = threading.Thread(target=broadcastLoop)
        t.start()
        db.create_all()

        return app

Кто-нибудь случайно узнал, как я могу открыть приложение, чтобы оно могло быть импортировано другими модулями? Или, если есть лучший подход к этому. Заранее спасибо

1 Ответ

0 голосов
/ 23 апреля 2020

На основе здесь и здесь , может быть, что-то в этом роде будет работать:

from init import app

def app_context():
    with app.app_context():
        yield


def findPositionBin(stationId, find_position, app_context):
    try:
        with app.app_context():
            result = Bin.query.filter_by(station_id=stationId).filter_by(position=find_position).first()
            print("result")
            return result
    except Exception as e:
        print(redBright(e))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...