Можно ли указать максимальное количество времени ожидания кода для запуска с Python? - PullRequest
6 голосов
/ 25 мая 2019

У меня есть фрагмент кода, из которого мне нужно получить вывод:

gps = get_gps_data()

Хотя, если вывод из get_gps_data() занимает слишком много времени, я бы хотел отменить процесс и установить gps на None. Модификация функции невозможна, поэтому есть ли способ указать максимальное количество времени ожидания для запуска и завершения некоторого кода, если он достигнет этого времени, скажем, 5 секунд?

1 Ответ

7 голосов
/ 25 мая 2019

Вы можете сделать что-то вроде ниже. Это будет работать для любой логики или функции, которую вы хотите проверить и остановить через некоторое время. Функция, которую вы хотите проверить и отменить, выполняется в отдельном потоке и контролируется. После ожидания в течение 3 секунд он отменяется.

Просто поместите свой код в функцию test (вы можете переименовать функции). test функция пытается спать в течение 100 секунд (может быть любая логика). Но основной вызов future.result(3) ждет только 3 секунды, иначе выдается TimeoutError и следует какая-то другая логика.

import time
import concurrent.futures as futures


def test():
    print('Sleeping for 100 seconds')
    time.sleep(100)
    # Add your code here
    # gps = get_gps_data()
    return 'Done'


# Main
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test)
    try:
        resp = future.result(3)
    except futures.TimeoutError:
        print('Some other resp')
    else:
        print(resp)
    executor._threads.clear()
    futures.thread._threads_queues.clear()

Вы можете попытаться минимизировать время ожидания внутри функции test до некоторого значения менее 3 секунд. В этом случае будет использован исходный результат, возвращенный функцией test.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...