Насколько я могу судить, TimeoutError фактически вызывается, когда вы ожидаете этого, а не после того, как задача завершена.
Однако ваша программа будет продолжать работать до тех пор, пока не будут выполнены все запущенные задачи. Это связано с тем, что выполняемые в настоящее время задачи (в вашем случае, вероятно, все отправленные вами задачи, так как размер пула равен количеству задач), на самом деле не «убиты».
Возникает ошибка TimeoutError, так что вы можете не ждать, пока задача будет завершена (и вместо этого делать что-то еще), но задача будет продолжаться, пока не будет завершена. И python не завершится, пока в потоках / подпроцессах вашего исполнителя есть незавершенные задачи.
Насколько я знаю, невозможно просто "остановить" выполнение в настоящее время Futures, вы можете только "отменить" запланированные задачи, которые еще не запущены. В вашем случае их не будет, но представьте, что у вас есть пул из 5 потоков / процессов, и вы хотите обработать 100 элементов. В какой-то момент может быть запланировано 20 выполненных задач, 5 запущенных задач и 75 задач. В этом случае вы сможете отменить эти 76 запланированных заданий, но выполняющиеся 4 будут продолжаться до завершения независимо от того, ожидаете вы результата или нет.
Даже если это не может быть сделано таким образом, я думаю, что должны быть способы достичь желаемого конечного результата. Может быть, эта версия может помочь вам в пути (не уверен, что она делает именно то, что вы хотели, но она может быть полезна):
import concurrent.futures
import time
import datetime
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
class Task:
def __init__(self, max_number):
self.max_number = max_number
self.interrupt_requested = False
def __call__(self):
print("Started:", datetime.datetime.now(), self.max_number)
last_number = 0;
for i in xrange(1, self.max_number + 1):
if self.interrupt_requested:
print("Interrupted at", i)
break
last_number = i * i
print("Reached the end")
return last_number
def interrupt(self):
self.interrupt_requested = True
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
tasks = [Task(num) for num in max_numbers]
for task, future in [(i, executor.submit(i)) for i in tasks]:
try:
print(future.result(timeout=1))
except concurrent.futures.TimeoutError:
print("this took too long...")
task.interrupt()
if __name__ == '__main__':
main()
Создавая вызываемый объект для каждой «задачи» и передавая его исполнителю вместо простой функции, вы можете предоставить способ «прервать» задачу.
Совет: удалите строку task.interrupt()
и посмотрите, что произойдет, это может помочь понять моё длинное объяснение выше; -)