Я написал много кода, используя модель flask_sqlalchemy, и у меня есть много классов orm, унаследованных от примера db.Model.
из кода. модель:
from threading import Thread
from queue import Queue
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
run_queue_flag = True
num_of_analyser_workers = 10
processes_dict = {}
tasks_queue = Queue()
error_queue = Queue()
updates_queue = Queue()
to_do_queue = Queue()
done_queue = Queue()
class ErrorLog(db.Model):
__tablename__ = 'ErrorLog'
id = db.Column(db.Integer, primary_key=True)
task_id = db.Column(db.Integer)
error_time = db.Column(db.DateTime)
stage = db.Column(db.Text)
error_string = db.Column(db.Text)
def __init__(self, error_string, error_time=datetime.utcnow(), task_id=None,stage='unknown'):
self.task_id = task_id
self.error_time = error_time
self.stage = stage
self.error_string = error_string
def log(self):
db.session.add(self)
db.session.commit()
def push(self):
global error_queue
error_queue.put_nowait(self)
пример из приложения:
app = Flask(__name__)
app.secret_key = os.environ.get('PYTHON_SECRET_KEY')
db.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+mysqlconnector://scott:tiger@localhost:3306/myschema"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
@app.route('/create_all')
def create_tables():
db.create_all()
return 'done'
@app.route('/collect_all')
def collect_data():
if sys.platform.startswith('win'):
collector = DataCollector(basedir + r"\..\Data\DataToCollect.xlsx")
else:
collector = DataCollector(basedir + r"/../Data/DataToCollect.xlsx")
collector.CollectAll()
return 'done'
@app.route('/init_workers')
def init_workers():
init_threads
return 'done'
@app.route('/run_test', methods=['POST'])
def run_functions_test():
if request.method == "POST":
data = request.get_json()
#make a task and push it to tasks_queue
#make some queries
return f'your request had been submitted - you have{num_of_tasks} in the queue'
else:
return redirect('/login')
Теперь я хочу обрабатывать запросы пользователей и сохранять результаты в базе данных (MySql), используя многопоточность. Чтобы избежать проблем параллелизма, я назначил один рабочий поток для таблицы - таблицу задач, таблицу результатов и таблицу журнала ошибок. запросы на запись в конкретную таблицу c помещаются в очередь запросов этой таблицы, и, как уже было сказано, только один рабочий записывает их в свою таблицу.
здесь я инициализирую потоки:
def init_threads():
global num_of_analyser_workers
global threads_dict
error_logger = Thread(target=Worker.error_logger_worker)
error_logger.start()
threads_dict['error_logger'] = error_logger
threads_dict['analyser_workers'] = []
for _ in range(num_of_analyser_workers):
p = Thread(target=Worker.analyser_worker)
p.start()
threads_dict['analyser_workers'].append(p)
task_logger = Thread(target=Worker.task_logger_worker)
task_logger.start()
threads_dict['task_logger'] = task_logger
results_logger_worker = Thread(target=Worker.results_logger_worker)
results_logger_worker.start()
threads_dict['results_logger_worker'] = results_logger_worker
вот пример error_logger_worker:
class Worker:
@staticmethod
def error_logger_worker():
global error_queue
global run_queue_flag
while run_queue_flag:
if not error_queue.empty():
error_log = error_queue.get_nowait()
error_log.log()
error_queue.task_done()
Объект db.session не работает внутри потока, поскольку он не находит приложение, к которому он привязан.
Форма Документы Я согласен с тем, что мне нужно перейти от объекта conviniant db.session к flask_sqlalchemy_session и sqlalchemy session_maker.
Как мне перейти к использованию модулей flask_sqlalchemy_session и session_maker? Должен ли я просто: 1. в модели заменить
db = SQLALCHEMY()
на
engine = create_engine("mysql+mysqlconnector://scott:tiger@localhost:3306/myschema")
session_factory = sessionmaker(bind=engine) db.init_app(app)
2. в приложении замените
db.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+mysqlconnector://scott:tiger@localhost:3306/myschema"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
на
db_session = flask_scoped_session(session_factory, app)
Evreywhere заменяет мои
db.session
префиксные вызовы на
db_session
Наконец, как мне создать / перенести объект сеанса в мой поток?
Спасибо!