У меня есть web-server
, разработанный в asyncio
(Service 1
). Сервер получает http-requests
от клиентов и для каждого http-request
публикует обмен message
на RabbitMQ. Существует еще одна масштабируемая служба (Service 2
), которая получает message
от RabbitMQ, выполняет некоторые вычисления, сохраняет результаты в Database
и устанавливает состояние Task
в done
(состояние сохраняется в Database
тоже). База данных: Postgresql
.
Сопрограмма веб-сервера, которая отправила сообщение на MQ
, должна понимать момент ( Как это сделать? ), когда Task
завершен, выполните дополнительную работу и отправьте ответ на http-client
.
Проблема заключается в разработке архитектуры этой системы, где этот сценарий может работать. Я рассматривал несколько вариантов.
Вариант 1. Используйте шаблон RabbitMQ RP C, как описано в https://www.rabbitmq.com/tutorials/tutorial-six-python.html
Но для моего В этом случае существует такой фактор, что число клиентов велико, данные каждого клиента не зависят от данных других клиентов, и ожидаются высокие нагрузки. Если я буду использовать одиночный RabbitMQ queue
, то для каждого одновременного http-client
Service 1's http-handler
получит некоррелированный MQ messages-responses
и должен будет отправить not acknowledge
на MQ
. Похоже, что нагрузка на сеть и процессор RabbitMQ будет происходить пропорционально количеству одновременных http-запросов Service 1
. Поэтому, вероятно, мне нужно использовать временный RabbitMQ queue
для каждого запроса клиента. Но я не могу точно понять, насколько это эффективно.
Вариант 2. Использование общей базы данных между Service 1
и Service 2
для хранения и отслеживания состояний Task
, и Сделайте обработчик запросов Service 1
следующим образом:
# Code of Service 1
async def http_request_handler(request):
await rabbitmq_publisher.publish(request['data_id'])
while True:
# Get state of task from Shared Database
state_in_db = await DBService.get_state_of_task_or_none(request['data_id'])
if not state_in_db:
continue
if state_in_db.state != 'DONE':
await asyncio.sleep(0.05)
continue
await extra_work()
return Response('ok')
# Code of Service 2
async def handle_mq_message(message):
await calculate(message)
# Set state of task in Shared Database
await DBService.set_state_of_task(message.data_id, 'DONE')
Но после исследований я не нашел никакой информации, это хороший шаблон.
Я могу ' Я не могу решить, какой подход выбрать. Может быть, есть лучший способ / какой-нибудь шаблон дизайна, чтобы сделать такие вещи более правильными?