Вы можете сделать что-то вроде ниже. Это будет работать для любой логики или функции, которую вы хотите проверить и остановить через некоторое время. Функция, которую вы хотите проверить и отменить, выполняется в отдельном потоке и контролируется. После ожидания в течение 3 секунд он отменяется.
Просто поместите свой код в функцию test
(вы можете переименовать функции). test
функция пытается спать в течение 100 секунд (может быть любая логика). Но основной вызов future.result(3)
ждет только 3 секунды, иначе выдается TimeoutError и следует какая-то другая логика.
import time
import concurrent.futures as futures
def test():
print('Sleeping for 100 seconds')
time.sleep(100)
# Add your code here
# gps = get_gps_data()
return 'Done'
# Main
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test)
try:
resp = future.result(3)
except futures.TimeoutError:
print('Some other resp')
else:
print(resp)
executor._threads.clear()
futures.thread._threads_queues.clear()
Вы можете попытаться минимизировать время ожидания внутри функции test
до некоторого значения менее 3 секунд. В этом случае будет использован исходный результат, возвращенный функцией test
.