tqdm
не раскрывает эту информацию как часть своего открытого API, и я не рекомендую пытаться взломать вашу собственную. Тогда вы будете зависеть от деталей реализации tqdm
, которые могут измениться в любое время.
Однако это не должно помешать вам писать свои собственные. Достаточно просто установить цикл с таймером, и затем вы можете прервать цикл, если это займет слишком много времени. Вот быстрый грубый пример, который все еще использует tqdm
для обеспечения визуальной обратной связи:
import time
from tqdm import tqdm
def long_running_function(n, timeout=5):
start_time = time.time()
for _ in tqdm(list(range(n))):
time.sleep(1) # doing some expensive work...
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
raise TimeoutError("long_running_function took too long!")
long_running_function(100, timeout=10)
Если вы запустите это, функция остановит свое собственное выполнение через 10 секунд, вызвав исключение. Вы можете перехватить это исключение на сайте вызова и ответить на него любым удобным для вас способом.
Если вы хотите быть умным, вы можете даже выделить это в tqdm
-обертной оболочке, подобной этой:
def timed_loop(iterator, timeout):
start_time = time.time()
iterator = iter(iterator)
while True:
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
raise TimeoutError("long_running_function took too long!")
try:
yield next(iterator)
except StopIteration:
pass
def long_running_function(n, timeout=5):
for _ in timed_loop(tqdm(list(range(n))), timeout=timeout):
time.sleep(0.1)
long_running_function(100, timeout=5)