Многопоточность с колбой - PullRequest
0 голосов
/ 07 сентября 2018

Я хотел бы вызвать generate_async_audio_service из представления и заставить его асинхронно генерировать аудиофайлы для списка слов, используя пул потоков, а затем фиксировать их в базе данных.

Я продолжаю сталкиваться сошибка, связанная с тем, что я работаю вне контекста приложения, хотя каждый раз создаю новый экземпляр polly и s3.

Как создать / загрузить несколько аудиофайлов одновременно?

from flask import current_app, 
from multiprocessing.pool import ThreadPool
from Server.database import db
import boto3
import io
import uuid


def upload_audio_file_to_s3(file):
   app = current_app._get_current_object()
   with app.app_context():
      s3 = boto3.client(service_name='s3',
               aws_access_key_id=app.config.get('BOTO3_ACCESS_KEY'),
               aws_secret_access_key=app.config.get('BOTO3_SECRET_KEY'))
      extension = file.filename.rsplit('.', 1)[1].lower()
      file.filename = f"{uuid.uuid4().hex}.{extension}"
      s3.upload_fileobj(file,
         app.config.get('S3_BUCKET'),
         f"{app.config.get('UPLOADED_AUDIO_FOLDER')}/{file.filename}",
         ExtraArgs={"ACL": 'public-read', "ContentType": file.content_type})
      return file.filename

def generate_polly(voice_id, text):
   app = current_app._get_current_object()
   with app.app_context():
      polly_client = boto3.Session(
         aws_access_key_id=app.config.get('BOTO3_ACCESS_KEY'),                   
         aws_secret_access_key=app.config.get('BOTO3_SECRET_KEY'),
         region_name=app.config.get('AWS_REGION')).client('polly')
      response = polly_client.synthesize_speech(VoiceId=voice_id,
                     OutputFormat='mp3', Text=text)
      return response['AudioStream'].read()


def generate_polly_from_term(vocab_term, gender='m'):
   app = current_app._get_current_object()
   with app.app_context():
      audio = generate_polly('Celine', vocab_term.term)
      file = io.BytesIO(audio)
      file.filename = 'temp.mp3'
      file.content_type = 'mp3'
      return vocab_term.id, upload_audio_file_to_s3(file)

def generate_async_audio_service(terms):
   pool = ThreadPool(processes=12)
   results = pool.map(generate_polly_from_term, terms)
   # do something w/ results

1 Ответ

0 голосов
/ 07 сентября 2018

Это не обязательно конкретный ответ, но вместо того, чтобы помещать вещи в комментарии, я помещу его здесь.

Celery - менеджер задач для python. Причина, по которой вы захотите использовать это, заключается в том, что если у вас есть задачи, проверяющие Flask, но они требуют больше времени, чем интервал между входящими задачами, тогда определенные задачи будут заблокированы, и вы не получите все свои результаты. Чтобы это исправить, вы передаете его другому процессу. Это выглядит так:

1) Client sends a request to Flask to process audio files

2) The files land in Flask to be processed, Flask will send an asyncronous task to Celery.

3) Celery is notified of the task and stores its state in some sort of messaging system (RabbitMQ and Redis are the canonical examples)

4) Flask is now unburdened from that task and can receive more

5) Celery finishes the task, including the upload to your database

Celery и Flask - это два отдельных процесса Python, взаимодействующих друг с другом. Это должно удовлетворить ваш многопоточный подход. Вы также можете получить состояние из задачи через Flask, если хотите, чтобы клиент проверял, что задача была / не была выполнена. Маршрут в вашей колбе app.py будет выглядеть так:

@app.route('/my-route', methods=['POST'])
def process_audio():
    # Get your files and save to common temp storage
    save_my_files(target_dir, files)

    response = celery_app.send_tast('celery_worker.files', args=[target_dir])
    return jsonify({'task_id': response.task_id})

Где celery_app происходит из другого модуля worker.py:

import os
from celery import Celery

env = os.environ

# This is for a rabbitMQ backend
CELERY_BROKER_URL = env.get('CELERY_BROKER_URL', 'amqp://0.0.0.0:5672/0')
CELERY_RESULT_BACKEND = env.get('CELERY_RESULT_BACKEND', 'rpc://')

celery_app = Celery('tasks', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)

Тогда ваш процесс сельдерея настроит работника что-то вроде:

from celery import Celery
from celery.signals import after_task_publish

env = os.environ
CELERY_BROKER_URL = env.get('CELERY_BROKER_URL')
CELERY_RESULT_BACKEND = env.get('CELERY_RESULT_BACKEND', 'rpc://')

# Set celery_app with name 'tasks' using the above broker and backend
celery_app = Celery('tasks', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)

@celery_app.task(name='celery_worker.files')
def async_files(path):
    # Get file from path
    # Process
    # Upload to database
    # This is just if you want to return an actual result, you can fill this in with whatever
    return {'task_state': "FINISHED"}

Это относительно просто, но может послужить отправной точкой. Я скажу, что некоторые настройки и настройки Celery не всегда наиболее интуитивны, но это сделает ваше приложение фляги доступным для тех, кто хочет отправлять на него файлы, не блокируя ничего другого.

Надеюсь, это несколько полезно

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...