Я хочу реализовать приложение с API через FastAPI, которое выполняет следующие действия:
Каждый вызов API создает задачу. Эта задача помещается в очередь брокером задач и вытягивается или проталкивается по каналу сообщения распределенными рабочими процессами. Эти рабочие обрабатывают задачу и получают результат, который транслируют по каналу сообщений. Посредник задач забирает эту трансляцию и, следовательно, результат, и удаляет задачу из очереди. Вызов API возвращается с полученным таким образом результатом.
В основном это работает. Задача создается, брокер задач добавляет задачу в свою очередь, передает ее работнику, получает результат от работников и снова удаляет его. Только последняя часть, с которой я не работал: вызов API, возвращающий результат.
Не знаю, как это реализовать. Я использую asyncio и мне нужен какой-то обратный вызов.
Упрощенное впечатление от кода выглядит следующим образом.
API:
async def post_document(document: BaseDocument):
"""Create the document with a specific type and an optional name given in the payload"""
task = DocumentTask({
'channel': 'worker',
'document': document.dict(),
'subtype': "POST_DOCUMENT"
})
result = await task_broker.give_task(task)
return result
Брокер задач
async def give_task(self, task_obj):
self.add_task_to_queue(task_obj)
self.message_channel.publish(task_obj)
А затем асинхронно рабочий выбирает задачу, обрабатывает ее, публикует задачу результата, которую подхватывает брокер задач, после чего брокер задач удаляет исходную задачу методом remove_task_from_queue. Но не знаете, как связать это с вызовом API. Любая помощь?