Многопоточность с асинхронными функциями для создания новых задач, которые не ждут других запущенных задач с питоном 3 - PullRequest
0 голосов
/ 25 июня 2019

У меня есть несколько инструментов, которые может использовать каждый пользователь, и я написал их на языке Python.Я работал с брокером сообщений RabbitMQ, чтобы получить запрос от пользователя и затем запустить задачу. Поэтому, когда каждый пользователь отправляет запрос на сервер, сервер обрабатывает все входящие задачи в одном цикле (потоке) как асинхронный, а затем ожидает весь тяжелый процесс в программеи не слушайте другое сообщение до последней выполненной работы.Все мои инструменты дают результат (функции генератора), а затем помещают их в очередь (rabbitmq).Как я могу запустить новый поток (новую задачу), который отправляет результаты инструментов пользователю и не ждет всех задач?

Это для асинхронного и многозадачного программирования с python

Класс WorkerProxy:

.
.
.
async def create_proccess(self, inputs):

        exchange, message = inputs

        data = dict()
        data['Result'] = json.dumps({"Status": '[+] Operation start'})
        data['Timestamp'] = str(datetime.now())
        data['Level'] = 'information'

        message_pack = json.loads(str(message.body.decode()))

        await self.send_message(
            str(json.dumps(data)), exchange, message.correlation_id,
            message.reply_to
        )

        for result in select_tool(message_pack):
            await self.send_message(
                str(result), exchange,
                message.correlation_id, message.reply_to
            )

        data['Result'] = json.dumps(
            {"Status": '[+] Operation done successfully'}
        )
        data['Timestamp'] = str(datetime.now())
        data['Level'] = 'information'

        await self.send_message(
            str(json.dumps(data)), exchange, message.correlation_id,
            message.reply_to
        )

    async def send_message(
        self, message: str, exchange,
        correlation_id, reply_to
    ):
        await exchange.publish(
            Message(
                body=message.encode(),
                correlation_id=correlation_id
            ),
            routing_key=reply_to
        )

    async def on_message(self, exchange: Exchange, message: IncomingMessage):

        ''' TODO => here we must build rpc_server for
            each client and create specific queue for them
        '''

        with message.process():
            # print("[X] Received message: %r" % message)
            print("=> Message: %r" % message.body)

            asyncio.ensure_future(self.create_proccess((exchange, message)))

.
.
.

Я ожидаю, что, когда каждое сообщение, полученное из очереди, сервер создаст новый параллельный поток с другими, чтобы отправить результат инструмента как ответное сообщение пользователю

1 Ответ

0 голосов
/ 25 июня 2019

Таким образом, вам нужно создать некоторое количество потоков и один запрос для каждого потока.

Вы можете использовать Threading модуль для создания потоков (может быть класс, производный от Thread с помощью run() метода или простой оболочки вокруг вашей функции), и Queue для очереди.

очень простой пример:

from queue import Queue
from threading import Thread

def my_function(q):
    print q.get() # or whatever

q = Queue()
t = Thread(target=my_function, args=(q,))
t.start()

Теперь my_function будет выполняться в отдельном потоке и будет иметь доступ к этой очереди. Когда выполнение функции будет завершено, поток также будет закрыт, поэтому вам придется реализовать некоторый цикл ожидания.

...