Поскольку примеры асинхронного программирования все еще довольно редки, я пытаюсь придумать свое собственное использование нового async def
. Я подумал, что самым простым примером, который я мог бы сделать, был бы цикл обработки событий, который прослушивает вводимые пользователем данные и выводит их обратно.
Я хочу цикл обработки событий, который прослушивает клавиши, которые нажимает пользователь, и печатает их каждую секунду. Я пытаюсь сделать это, имея одну задачу, которая печатает из очереди клавиш, и вторую задачу, которая прослушивает нажатые клавиши и добавляет их в очередь.
Мне не хватает хорошего способа асинхронного прослушивания нажатий клавиш. Вот что у меня сейчас.
import asyncio
KEY_QUEUE = []
async def printer():
while True:
await asyncio.sleep(1)
print('In the last second you pressed:', *KEY_QUEUE)
KEY_QUEUE.clear()
async def listener():
while True:
... # await a key to be pressed and add it to KEY_QUEUE
loop = asyncio.get_event_loop()
loop.create_task(printer())
loop.create_task(listener())
loop.run_forever()
Ожидаемый результат будет выглядеть так
In the last second you pressed: h e l l o
In the last second you pressed: w o r
In the last second you pressed: l d
In the last second you pressed:
In the last second you pressed: i t w o
In the last second you pressed: r k s
Я сомневаюсь, что есть приемлемая сопрограмма, такая как asyncio.await_pressed_key
, как бы мы ее создали?
Любой другой подход приветствуется, так как моя цель - не заставить этот конкретный пример работать, а скорее генерировать значимые асинхронные примеры программирования.