Flask - задание не работает как фоновый процесс - PullRequest
0 голосов
/ 12 сентября 2018

Я пытаюсь запустить приложение Flask, которое состоит из:

  1. Получение запросов API на лету
  2. Загрузка каждого запроса в SQLalchemy базу данных
  3. Запуск заданий 1 и 2 в качестве фонового процесса

Для этого у меня есть следующий код:

from flask import Flask
from flask import current_app
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import queue

app = Flask(__name__)
q = queue.Queue()

def build_cache(): 
    # 1. Yielding API requests on the fly
    track_and_features = spotify.query_tracks() # <- a generator
    while True:
        q.put(next(track_and_features))


def upload_cache(tracks_and_features):
    # 2. Uploading each request to a `SQLalchemy` database
    with app.app_context():      
        Upload_Tracks(filtered_dataset=track_and_features)

    return "UPLOADING TRACKS TO DATABASE" 


@app.route('/cache')
def cache():
    # 3. Do `1` and `2` as a background process
    with concurrent.futures.ThreadPoolExecutor() as executor:

        future_to_track = {executor.submit(build_cache): 'TRACKER DONE'}

        while future_to_track:
            # check for status of the futures which are currently working
            done, not_done = concurrent.futures.wait(
                                                future_to_track, 
                                                timeout=0.25,
                                                return_when=concurrent.futures.FIRST_COMPLETED) 

            # if there is incoming work, start a new future
            while not q.empty():

                # fetch a track from the queue
                track = q.get()

                # Start the load operation and mark the future with its TRACK
                future_to_track[executor.submit(upload_cache, track)] = track
            # process any completed futures
            for future in done:
                track = future_to_track[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (track, exc))

                del future_to_track[future]

    return 'Cacheing playlist in the background...'

Все вышеперечисленные работы, НО НЕКАК ФОНОВЫЙ ПРОЦЕСС.Приложение зависает при вызове cache() и возобновляет работу только после завершения процесса.

Я запускаю его с gunicorn -c gconfig.py app:app -w 4 --threads 12

что я делаю не так?


РЕДАКТИРОВАТЬ : если все упроститьвыполните отладку и просто напишите:

# 1st background process
def build_cache():
    # only ONE JOB
    tracks_and_features = spotify.query_tracks() # <- not a generator               
    while True:
         print next(tracks_and_features)

# background cache
@app.route('/cache')
def cache():
    executor.submit(build_cache)
    return 'Cacheing playlist in the background...'

ТО затем процесс запускается в фоновом режиме.

Однако, если я добавлю другое задание:

def build_cache():

    tracks_and_features = spotify.query_tracks()
    while True:
        Upload_Tracks(filtered_dataset=next(tracks_and_features) #SQLalchemy db

фон снова не работает.

Короче говоря:

Фон работает, только если я запускаю ОДНО задание за раз (что было ограничением идеи использования очередей в первомместо) .

Кажется, проблема в связывании фонового процесса с SQLalchemy, не знаю.полностью потерян здесь.

Ответы [ 2 ]

0 голосов
/ 21 сентября 2018

Попробуйте создать ThreadPoolExecutor вне обработчика маршрута.

from flask import Flask

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time


def foo(*args):
    while True:
        print('foo', args)
        time.sleep(10)


app = Flask(__name__)

executor = concurrent.futures.ThreadPoolExecutor()


@app.route('/cache')
def cache():
    executor.submit(foo, '1')
    executor.submit(foo, '2')
    return 'in cache'
0 голосов
/ 16 сентября 2018

Все еще не уверен, что вы имели в виду под

Я имею в виду, что приложение ожидает выполнения всех запросов при входе в систему и только затем переходит на домашнюю страницу.Он должен сразу перейти на домашнюю страницу с запросами, сделанными в фоновом режиме

Здесь есть несколько проблем:

  • Ваша очередь является глобальной для процесса т.е. существует только одна очередь на одного работника-оружейника;вы, вероятно, хотите, чтобы очередь была привязана к вашему запросу, чтобы несколько запросов не разделяли одну и ту же очередь в памяти.Попробуйте использовать контекстные локальные компьютеры
  • Если UploadTracks выполняет запись в базу данных, возможно, блокировка таблицы.Проверьте ваши индексы и проверьте ожидания блокировки в вашей базе данных.
  • SQLAlchemy может быть настроен с небольшим пулом соединений , а второй UploadTracks ожидает, пока первый вернет соединение.

В первом примере конечная точка ожидает завершения всех фьючерсов перед возвратом, тогда как во втором примере конечная точка возвращается сразу после отправки задач исполнителю.Если вы хотите, чтобы колба реагировала быстро, пока задачи еще выполнялись в фоновых потоках, удалите with concurrent.futures.ThreadPoolExecutor() as executor: и создайте глобальный пул потоков в верхней части модуля.

Используя with, менеджер контекста ждет всех отправленных задач перед выходом, но я не уверен, является ли это вашей основной проблемой.

...