Python create_task не работает в запущенном цикле событий - PullRequest
0 голосов
/ 31 мая 2019

У меня есть простой кусок кода, который некоторое время сводит меня с ума. Несколько дней назад я разместил этот вопрос с вопросом create_task не работает с input. Теперь я выяснил что-то связанное с этим. Я запускаю цикл обработки событий в отдельном потоке и помещаю в него задачи. Очень простой код.

import asyncio
import threading


async def printer(message):
    print(f'[printer] {message}')


def loop_runner(loop):
    loop.run_forever()


if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    t = threading.Thread(target=loop_runner, args=(event_loop,))
    t.start()

    for m in ['hello', 'world', 'foo', 'bar']:
        print(f'[loop running ?] {event_loop.is_running()}')
        event_loop.create_task(printer(m))

Ничего не печатается, кроме этих сообщений журнала.

[loop running ?] True
[loop running ?] True
[loop running ?] True
[loop running ?] True

Теперь, если я заблокирую поток потока событий и позволю ему работать после такой паузы.

def loop_runner(loop):
    time.sleep(1 / 1000)
    loop.run_forever()

Все работает и печатается

[loop running ?] False
[loop running ?] False
[loop running ?] False
[loop running ?] False
[printer] hello
[printer] world
[printer] foo
[printer] bar

На первый взгляд, задачи, созданные в цикле обработки событий, не выполняются. Но почему это так?

Я ничего не видел в документации. В большинстве примеров, которые я видел в Интернете, люди в цикле создают задачи из других сопрограмм и ожидают их. Но я думаю, что законно использовать задачи создания вне сопрограмм, если вы не хотите их ждать.

Ответы [ 2 ]

3 голосов
/ 31 мая 2019

При создании задачи вне потока цикла событий необходимо использовать asyncio.run_coroutine_threadsafe. Эта функция будет планировать сопрограмму потокобезопасным способом и уведомлять цикл обработки событий о том, что необходимо выполнить новую работу. Он также вернет объект concurrent.futures.Future, который можно использовать для блокировки текущего потока, пока не будет доступен результат.

На первый взгляд, задачи, созданные в цикле обработки событий, не выполняются. Но почему это так?

Вызов create_task недостаточен, поскольку не содержит кода для «пробуждения» цикла обработки событий. Это особенность - такой пробуждение обычно не требуется, и добавление его просто замедлит обычное однопоточное использование. Когда create_task вызывается из потока цикла событий, он находится внутри обратного вызова цикла событий, поэтому цикл событий может проверять свою очередь задач, как только он получает управление, когда он завершил выполнение обратного вызова. Но когда create_task вызывается из другого потока, цикл событий спит в ожидании ввода-вывода, поэтому для его активации требуется run_coroutine_threadsafe.

Чтобы проверить это, вы можете создать сопрограмму «сердцебиение», которая содержит только бесконечный цикл, который печатает что-то и ожидает asyncio.sleep(1). Вы увидите, что задачи, созданные с помощью create_task, выполняются вместе с сердцебиением, что также приводит к пробуждению цикла событий. В загруженных приложениях asyncio этот эффект может создать впечатление, что create_task из другого потока «работает». Однако на это никогда не следует полагаться, так как create_task не может реализовать надлежащую блокировку и может повредить внутренние компоненты цикла событий.

Я ничего такого не видел в документации.

Взгляните на раздел параллелизма и многопоточности .

0 голосов
/ 31 мая 2019

Ваш код работает так, как и ожидалось:

Следует отметить, что вызов этого (run_forever()) приводит к тому, что наш основной поток блокируется на неопределенный срок.

(https://tutorialedge.net/python/concurrency/asyncio-event-loops-tutorial/#the-run-forever-method)

см. Также Asyncio. Как использовать run_forever? для решений (loop.call_soon_threadsafe() и asyncio.run_coroutine_threadsafe()).

...