Когда и как использовать очереди asyncio? - PullRequest
0 голосов
/ 19 января 2020

У меня есть несколько маршрутов API, которые возвращают данные путем индивидуального запроса к базе данных.

Теперь я пытаюсь создать панель мониторинга, которая запрашивает API выше. Как поместить вызовы API в очередь, чтобы они выполнялись асинхронно?

Я пытался

await queue.put({'response_1': await api_1(**kwargs), 'response_2': await api_2(**kwargs)})

Кажется, что данные возвращаются во время выполнения задачи поставить в очередь.

Теперь я использую

await queue.put(('response_1', api_1(**args_dict)))

в производителе и в потребителе, я анализирую кортеж и выполняю вызовы API, которые, как мне кажется, ' Я делаю неправильно.

Вопрос1 Есть ли лучший способ сделать это?

Это код, который я использую для создания задач

producers = [create_task(producer(**args_dict, queue)) for row in stats]
consumers = [create_task(consumer(queue)) for row in stats]
await gather(*producers)
await queue.join()
for con in consumers:
    con.cancel()

Вопрос2 Должен ли я использовать create_task или sure_future? Извините, если это повторяется, но я не могу понять разницу, и после поиска в Интернете я стал более смущенным.

Я использую FastAPI, пакеты баз данных (asyn c).

I ' используя кортеж вместо словаря, как await queue.put ('response_1', api_1 (** kwargs))

./app/dashboard.py:90: RuntimeWarning: coroutine 'api_1' was never awaited
item: Tuple = await queue.get_nowait()

Мой код для потребителя:

async def consumer(return_obj: dict, que: Queue):
    item: Tuple = await queue.get_nowait()
    print(f'consumer took {item[0]} from queue')
    return_obj.update({f'{item[0]}': await item[1]})
    await queue.task_done()

, если я не t использовать get_nowait, потребитель застревает, потому что очередь может быть пустой, но если я использую get_nowait, выше будет показана ошибка. Я не определил максимальную длину очереди

----------- РЕДАКТИРОВАТЬ -----------

Производитель

async def producer(queue: Queue, **kwargs):
    await queue.put('response_1', api_1(**kwargs))

1 Ответ

1 голос
/ 20 января 2020

Вы можете удалить await из вашего первого фрагмента и отправить объект сопрограммы в очередь. Объект сопрограммы - это сопрограмма, которая была вызвана, но еще не ожидалась.

# producer:
await queue.put({'response_1': api_1(**kwargs),
                 'response_2': api_2(**kwargs)})
...

# consumer:
while True:
    dct = await queue.get()
    for name, api_coro in dct:
        result = await api_coro
        print('result of', name, ':', result)

Должен ли я использовать create_task или ensure_future?

Если аргумент является результатом вызова функции сопрограммы, вы должны использовать create_task (см. этот комментарий от Guido для объяснения). Как следует из названия, он вернет экземпляр Task, который управляет этой сопрограммой. Эту задачу также можно ожидать, но она продолжает выполняться в фоновом режиме.

ensure_future - это гораздо более специализированная функция, которая преобразует различные виды ожидаемых объектов в их соответствующее будущее. Это полезно, когда реализует функции, такие как asyncio.gather(), которые принимают различные виды ожидаемых объектов для удобства и должны преобразовать их в фьючерсы перед работой с ними.

...