Как использовать concurrent.futures с таймаутами? - PullRequest
15 голосов
/ 28 июня 2011

Я пытаюсь заставить тайм-ауты работать в python3.2 с помощью модуля concurrent.futures. Однако, когда это делает тайм-аут, это действительно не останавливает выполнение. Я пытался как с потоками, так и с исполнителями пула процессов, ни один из них не останавливал задачу, и только до ее завершения истекает время ожидания. Так кто-нибудь знает, возможно ли заставить это работать?

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0;
    for i in range(1, max_number + 1):
        last_number = i * i
    return last_number

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")

if __name__ == '__main__':
    main()

Ответы [ 2 ]

17 голосов
/ 29 июня 2011

Насколько я могу судить, TimeoutError фактически вызывается, когда вы ожидаете этого, а не после того, как задача завершена.

Однако ваша программа будет продолжать работать до тех пор, пока не будут выполнены все запущенные задачи. Это связано с тем, что выполняемые в настоящее время задачи (в вашем случае, вероятно, все отправленные вами задачи, так как размер пула равен количеству задач), на самом деле не «убиты».

Возникает ошибка TimeoutError, так что вы можете не ждать, пока задача будет завершена (и вместо этого делать что-то еще), но задача будет продолжаться, пока не будет завершена. И python не завершится, пока в потоках / подпроцессах вашего исполнителя есть незавершенные задачи.

Насколько я знаю, невозможно просто "остановить" выполнение в настоящее время Futures, вы можете только "отменить" запланированные задачи, которые еще не запущены. В вашем случае их не будет, но представьте, что у вас есть пул из 5 потоков / процессов, и вы хотите обработать 100 элементов. В какой-то момент может быть запланировано 20 выполненных задач, 5 запущенных задач и 75 задач. В этом случае вы сможете отменить эти 76 запланированных заданий, но выполняющиеся 4 будут продолжаться до завершения независимо от того, ожидаете вы результата или нет.

Даже если это не может быть сделано таким образом, я думаю, что должны быть способы достичь желаемого конечного результата. Может быть, эта версия может помочь вам в пути (не уверен, что она делает именно то, что вы хотели, но она может быть полезна):

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

class Task:
    def __init__(self, max_number):
        self.max_number = max_number
        self.interrupt_requested = False

    def __call__(self):
        print("Started:", datetime.datetime.now(), self.max_number)
        last_number = 0;
        for i in xrange(1, self.max_number + 1):
            if self.interrupt_requested:
                print("Interrupted at", i)
                break
            last_number = i * i
        print("Reached the end")
        return last_number

    def interrupt(self):
        self.interrupt_requested = True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
        tasks = [Task(num) for num in max_numbers]
        for task, future in [(i, executor.submit(i)) for i in tasks]:
            try:
                print(future.result(timeout=1))
            except concurrent.futures.TimeoutError:
                print("this took too long...")
                task.interrupt()


if __name__ == '__main__':
    main()

Создавая вызываемый объект для каждой «задачи» и передавая его исполнителю вместо простой функции, вы можете предоставить способ «прервать» задачу. Совет: удалите строку task.interrupt() и посмотрите, что произойдет, это может помочь понять моё длинное объяснение выше; -)

8 голосов
/ 23 июня 2017

Недавно я также затронул эту проблему, и, наконец, я нашел следующее решение, используя ProcessPoolExecutor:


def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")
            stop_process_pool(executor)

def stop_process_pool(executor):
    for pid, processes in executor._processes.items():
        process.terminate()
    executor.shutdown()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...