asyncio.Queue как поток производитель-потребитель на веб-сервере, таком как Quart - PullRequest
0 голосов
/ 27 мая 2020

Можно ли использовать asyncio.Queue с веб-сервером, например Quart , для связи между производителем и потребителем?

Вот что я пытаюсь сделать ....

from quart import Quart, request
import asyncio

queue = asyncio.Queue()
producers = []
consumers = []


async def producer(mesg):
    print(f'produced {mesg}')
    await queue.put(mesg)
    await asyncio.sleep(1) # do some work


async def consumer():
    while True:
        token = await queue.get()
        await asyncio.sleep(1) # do some work
        queue.task_done()
        print(f'consumed {token}')


@app.route('/route', methods=['POST'])
async def index():
    mesg = await request.get_data()
    try:
        p = asyncio.create_task(producer(mesg))
        producers.append(p)
        c = asyncio.create_task(consumer())
        consumers.append(c)
        return f"published message {mesg}", 200
    except Exception as e:
        logger.exception("Failed tp publish message %s!", mesg)
        return f"Failed to publish message: {mesg}", 400

if __name__ == '__main__':
    PORT = int(os.getenv('PORT')) if os.getenv('PORT') else 8050
    app.run(host='0.0.0.0', port=PORT, debug=True)

Это нормально работает. Но я не уверен, что это хорошая практика, потому что я не понимаю, как (где в моем коде) выполнить следующие шаги.

# Making sure all the producers have completed
await asyncio.gather(*producers)

#wait for the remaining tasks to be processed
await queue.join()

# cancel the consumers, which are now idle
for c in consumers:
    c.cancel()

EDIT-1:

Я пробовал с использованием @app.after_serving с некоторыми операторами logger.debug.

@app.after_serving
async def shutdown():
    logger.debug("Shutting down...")
    logger.debug("waiting for producers to finish...")
    await asyncio.gather(*producers)
    logger.debug("waiting for tasks to complete...")
    await queue.join()
    logger.debug("cancelling consumers...")
    for c in consumers:
        c.cancel()

Но операторы отладки не печатаются, когда hypercorn корректно завершает работу. Итак, я не уверен, действительно ли функция (выключение), обозначенная @app.after_serving, вызывается во время выключения.

Вот сообщение от hypercorn во время выключения

appserver_1  | 2020-05-29 15:55:14,200 - base_events.py:1490 -        create_server - INFO - <Server sockets=(<asyncio.TransportSocket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8080)>,)> is serving
appserver_1  | Running on 0.0.0.0:8080 over http (CTRL + C to quit)
Gracefully stopping... (press Ctrl+C again to force)

I используя kill -SIGTERM <PID>, чтобы сигнализировать о постепенном завершении процесса.

Ответы [ 2 ]

0 голосов
/ 29 мая 2020

Я бы поместил код очистки в функцию выключения after_serving ,

@app.after_serving
async def shutdown():
    # Making sure all the producers have completed
    await asyncio.gather(*producers)

    #wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()

Что касается глобальных переменных, я обычно храню их в приложении напрямую, чтобы к ним можно было получить доступ через прокси current_app. Однако обратите внимание, что это (и ваше решение) работает только для одного процесса (рабочего), если вы хотите использовать несколько рабочих (или эквивалентных хостов), вам понадобится стороннее хранилище для этой информации, например, с помощью redis.

0 голосов
/ 28 мая 2020

Но я не уверен, что это хорошая практика.

Глобальные переменные, подобные тем, которые вы создали в своем примере, обычно не подходят для корпоративных решений. Когда дело касается глобальных переменных, особенно в Python. По моему опыту, передача переменных в функцию или класс является более чистым подходом. Однако я не знаю, как это сделать в квартах, поскольку я не использую эту библиотеку.

# Making sure all the producers have completed
#wait for the remaining tasks to be processed
# cancel the consumers, which are now idle

Обычно задачи очистки выполняются при выходе из события l oop и перед выходом из приложения . Я не знаю, как работает quart, но вы могли бы поместить этот logi c после app.run(), чтобы задачи очистки выполнялись после остановки события l oop. Это может варьироваться в зависимости от того, как ваше приложение закрывается. Поищите в документации какое-то событие «при выключении», к которому вы можете подключиться.

...