У меня есть несколько маршрутов API, которые возвращают данные путем индивидуального запроса к базе данных.
Теперь я пытаюсь создать панель мониторинга, которая запрашивает API выше. Как поместить вызовы API в очередь, чтобы они выполнялись асинхронно?
Я пытался
await queue.put({'response_1': await api_1(**kwargs), 'response_2': await api_2(**kwargs)})
Кажется, что данные возвращаются во время выполнения задачи поставить в очередь.
Теперь я использую
await queue.put(('response_1', api_1(**args_dict)))
в производителе и в потребителе, я анализирую кортеж и выполняю вызовы API, которые, как мне кажется, ' Я делаю неправильно.
Вопрос1 Есть ли лучший способ сделать это?
Это код, который я использую для создания задач
producers = [create_task(producer(**args_dict, queue)) for row in stats]
consumers = [create_task(consumer(queue)) for row in stats]
await gather(*producers)
await queue.join()
for con in consumers:
con.cancel()
Вопрос2 Должен ли я использовать create_task или sure_future? Извините, если это повторяется, но я не могу понять разницу, и после поиска в Интернете я стал более смущенным.
Я использую FastAPI, пакеты баз данных (asyn c).
I ' используя кортеж вместо словаря, как await queue.put ('response_1', api_1 (** kwargs))
./app/dashboard.py:90: RuntimeWarning: coroutine 'api_1' was never awaited
item: Tuple = await queue.get_nowait()
Мой код для потребителя:
async def consumer(return_obj: dict, que: Queue):
item: Tuple = await queue.get_nowait()
print(f'consumer took {item[0]} from queue')
return_obj.update({f'{item[0]}': await item[1]})
await queue.task_done()
, если я не t использовать get_nowait, потребитель застревает, потому что очередь может быть пустой, но если я использую get_nowait, выше будет показана ошибка. Я не определил максимальную длину очереди
----------- РЕДАКТИРОВАТЬ -----------
Производитель
async def producer(queue: Queue, **kwargs):
await queue.put('response_1', api_1(**kwargs))