Как очистить после тайм-аута asyncio.wait_for? - PullRequest
0 голосов
/ 15 июня 2019

Моя цель - немного попрактиковаться в использовании библиотеки asyncio.Я прочитал несколько вводных руководств, и теперь я хотел бы написать немного кода самостоятельно.

Я хотел бы начать две простые задачи, которые в основном увеличивают общее значение, хранящееся во внешнем классе.Первый из них - автоматический: увеличение на 1 через 5 секунд.Вторая задача связана с пользователем: если вы введете какое-то значение в течение этих 5 секунд, оно также должно быть добавлено.

Проблема в том, что когда я не вводю никакого значения, мой цикл не закрывается -программа все еще активна и работает до тех пор, пока я не принудительно остановлю ее - тогда я получаю следующую ошибку:

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
Timeout
Loop finished. Value is 1
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Process finished with exit code 0

В основном после «Цикл завершен», есть конец программы, но когда значение не было введено в консольвход, программа просто зависает.Когда я вхожу в любой v

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
5
Loop finished. Value is 6

Process finished with exit code 0

Похоже, когда происходит TimeoutError, после asyncio.wait_for что-то не очищается.Можете ли вы помочь мне и сказать, что не так?Это мой код:

import asyncio
import sys


class ValContainer:
    _val = 0

    @staticmethod
    def inc_val(how_many=1):
        ValContainer._val += how_many

    @staticmethod
    def get_val() -> int:
        return ValContainer._val


async def auto_increment():
    print(f'[Auto_increment: ] This task will increment value after 5 seconds')
    await asyncio.sleep(5)
    ValContainer.inc_val()
    return True


async def manual_increment(loop):
    print(f'[Manual increment: ] Waiting 5s for inc value:')
    try:
        future = loop.run_in_executor(None, sys.stdin.readline)
        line = await asyncio.wait_for(future, 5, loop=loop)
        if line:
            try:
                how_many = int(line)
                ValContainer.inc_val(how_many)
            except ValueError:
                print('That\'s not a number!')

    except asyncio.TimeoutError:
        print('Timeout')
    finally:
        return True

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    task_auto = loop.create_task(auto_increment())
    task_man = loop.create_task(manual_increment(loop))
    loop.run_until_complete(task_auto)
    loop.run_until_complete(task_man)
    print(f'Loop finished. Value is {ValContainer.get_val()}')
    loop.close()

1 Ответ

1 голос
/ 15 июня 2019

Вы создали отдельный поток в threadpoolexecutor, и эти на самом деле не могут быть отменены .Задача «* 1003» «делегат» отменяется, но вызов «* 1004» будет оставаться там бесконечно.Вы можете завершить его, нажав Enter, так как это дает вам полную строку на sys.stdin.

. Вы должны будете использовать один из обходных путей, чтобы отменить чтение здесь;обратите внимание, что вы не можете указать ThreadPoolExecutor использовать поток демона.

В случае ожидания ввода пользователя как отдельной задачи в контексте асинхронного программирования, вероятно, проще создать собственный поток, а нечем попросить THreadPoolExecutor управлять потоками за вас, так что вы можете установить daemon=True для этого потока и просто заставить процесс завершить этот поток при выходе.

...