У меня есть несколько инструментов, которые может использовать каждый пользователь, и я написал их на языке Python.Я работал с брокером сообщений RabbitMQ, чтобы получить запрос от пользователя и затем запустить задачу. Поэтому, когда каждый пользователь отправляет запрос на сервер, сервер обрабатывает все входящие задачи в одном цикле (потоке) как асинхронный, а затем ожидает весь тяжелый процесс в программеи не слушайте другое сообщение до последней выполненной работы.Все мои инструменты дают результат (функции генератора), а затем помещают их в очередь (rabbitmq).Как я могу запустить новый поток (новую задачу), который отправляет результаты инструментов пользователю и не ждет всех задач?
Это для асинхронного и многозадачного программирования с python
Класс WorkerProxy:
.
.
.
async def create_proccess(self, inputs):
exchange, message = inputs
data = dict()
data['Result'] = json.dumps({"Status": '[+] Operation start'})
data['Timestamp'] = str(datetime.now())
data['Level'] = 'information'
message_pack = json.loads(str(message.body.decode()))
await self.send_message(
str(json.dumps(data)), exchange, message.correlation_id,
message.reply_to
)
for result in select_tool(message_pack):
await self.send_message(
str(result), exchange,
message.correlation_id, message.reply_to
)
data['Result'] = json.dumps(
{"Status": '[+] Operation done successfully'}
)
data['Timestamp'] = str(datetime.now())
data['Level'] = 'information'
await self.send_message(
str(json.dumps(data)), exchange, message.correlation_id,
message.reply_to
)
async def send_message(
self, message: str, exchange,
correlation_id, reply_to
):
await exchange.publish(
Message(
body=message.encode(),
correlation_id=correlation_id
),
routing_key=reply_to
)
async def on_message(self, exchange: Exchange, message: IncomingMessage):
''' TODO => here we must build rpc_server for
each client and create specific queue for them
'''
with message.process():
# print("[X] Received message: %r" % message)
print("=> Message: %r" % message.body)
asyncio.ensure_future(self.create_proccess((exchange, message)))
.
.
.
Я ожидаю, что, когда каждое сообщение, полученное из очереди, сервер создаст новый параллельный поток с другими, чтобы отправить результат инструмента как ответное сообщение пользователю