Python и asyncio - как использовать run_in_executor (), чтобы не блокировать продолжающееся выполнение main? - PullRequest
0 голосов
/ 13 марта 2020

Я пытаюсь набрать скорость с Python и asyncio, и у меня возникают трудности с настройкой, поэтому основная часть программы не блокируется при запуске процесса asyncio. Например, рассмотрите этот фиктивный клиент, который использует threading для получения сообщений, и вставьте их в очередь сообщений, которую main затем может обработать, наряду с main, выполняя другие действия:

# mock_client.py

import threading
import time
import random
import collections

messageQueue = collections.deque()

def main():

    # start the thread running
    receiveMessagesThread = threading.Thread(target=receiveMessages)
    receiveMessagesThread.start()

    # in an actual program this would be a much longer while True loop, and getting the updated
    # messages from the message queue would be only one of many activities
    while True:
        # get and process all the messages in the message queue that have been queued up
        # since the last time around
        while len(messageQueue)  > 0:
            message = messageQueue.popleft()
            # an actual program would do something significant with these messages,
            # for this test app just print them out
            print('message = ' + str(message))
        # end while

        # an actual program would have many other activities to do here, use a random sleep to simulate this
        time.sleep(random.uniform(1.0, 2.0))
    # end while
# end main

def receiveMessages():
    while True:
        # there would be a time delay inbetween receiving messages in an actual program,
        # use a random sleep to simulate this
        time.sleep(random.uniform(0.1, 0.2))

        # an actual client would recive messages here and add them to the message queue,
        # to simulate this just make up a random number
        myRandInt = random.randint(1, 10)
        messageQueue.append(str(myRandInt))
    # end while
# end function

if __name__ == '__main__':
    main()

Я пытаюсь установить до эквивалента с asyncio вместо многопоточности, и до сих пор я не смог понять, как это сделать. В большинстве asyncio примеров используются loop.run_until_complete и loop.run_forever, которые я не могу поставить в начале main, потому что это заблокирует перед while True l oop и помешает main выполнить другие что нужно сделать.

После прочтения документации и некоторых других сообщений о переполнении стека, особенно этого , у меня сложилось впечатление, что вызов run_in_executor - это ответ, но я Мне неясно, как это настроить. Кто-нибудь может подсказать, как изменить вышеуказанное, чтобы использовать asyncio / run_in_executor вместо многопоточности?

--- Edit ---

На основе этого видео Я разработал этот пример, который существенно приближается к выполнению моих критериев:

# asyncio4.py

import asyncio
import random
import collections

messageQueue = collections.deque()

async def main():
    # in an actual program this would be a much longer while True loop, and getting the updated
    # messages from the message queue would be only one of many activities
    while True:
        # get and process all the messages in the message queue that have been queued up
        # since the last time around
        while len(messageQueue) > 0:
            message = messageQueue.popleft()
            # an actual program would do something significant with these messages,
            # for this test app just print them out
            print('message = ' + str(message))
        # end while

        # an actual program would have many other activities to do here, use a random sleep to simulate this
        await asyncio.sleep(random.uniform(1.0, 2.0))
    # end while
# end function

async def receiveMessages():
    while True:
        # there would be a time delay inbetween receiving messages in an actual program,
        # use a random sleep to simulate this
        await asyncio.sleep(random.uniform(0.1, 0.2))

        # an actual client would recive messages here and add them to the message queue,
        # to simulate this just make up a random number
        myRandInt = random.randint(1, 10)
        messageQueue.append(str(myRandInt))
    # end while
# end function

loop = asyncio.get_event_loop()

try:
    asyncio.ensure_future(main())
    asyncio.ensure_future(receiveMessages())
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()
# end try

Кажется, единственное, что осталось, - это заменить два ensure_future вызова одним run_in_executor вызовом, когда main не нужно будет украшать async. Кто-нибудь может подсказать, как это сделать?

--- Edit ---

Я, наверное, должен был упомянуть ранее, что в функции receiveMessages мне нужно использовать другую библиотеку, которая требует receiveMessages быть async, но я бы предпочел, чтобы main не был украшен async, поскольку это может наложить другие ограничения, о которых я не знаю. Необходимость использовать await asyncio.sleep(random.uniform(1.0, 2.0)) вместо простого time.sleep(random.uniform(1.0, 2.0)) - это один предел, о котором я знаю, но есть ли другие?

Также, как указывалось user4815162342, кажется, что concurrent.futures часто используется в сочетании с run_in_executor, например, в этом примере , но я не уверен, как адаптировать это к моему случаю выше.

...