Любое время ожидания concurrent.futures, которое действительно работает? - PullRequest
0 голосов
/ 02 января 2019

Попытка написать на основе процесса тайм-аут (синхронизация) по дешевке, как это:

from concurrent.futures import ProcessPoolExecutor

def call_with_timeout(func, *args, timeout=3):
    with ProcessPoolExecutor(max_workers=1) as pool:
        future = pool.submit(func, *args)
        result = future.result(timeout=timeout)

Но, похоже, аргумент timeout, переданный future.result , на самом деле не работает так, как рекламируется.

>>> t0 = time.time()
... call_with_timeout(time.sleep, 2, timeout=3)
... delta = time.time() - t0
... print('wall time:', delta)
wall time: 2.016767978668213

OK.

>>> t0 = time.time()
... call_with_timeout(time.sleep, 5, timeout=3)
... delta = time.time() - t0
... print('wall time:', delta)
# TimeoutError

Не в порядке - разблокируется через 5 секунд , а не через 3 секунды.

Связанные вопросы показывают, как это сделать с пулами потоков или с signal . Как отключить процесс, отправленный в пул, через n секунд, без использования какого-либо _private API многопроцессорной обработки? Жесткое уничтожение - это нормально, нет необходимости запрашивать полное отключение.

Ответы [ 2 ]

0 голосов
/ 02 января 2019

Возможно, вы захотите взглянуть на pebble.

Его ProcessPool был разработан для решения именно этой проблемы: включить таймаут и отменить выполнение задач без необходимости выключения всего пула.

Когда время ожидания истекает или отменяется, работник фактически прекращает работу, фактически останавливая выполнение запланированной функции.

Время ожидания:

pool = pebble.ProcessPool(max_workers=1)
future = pool.schedule(func, args=args, timeout=1)
try:
    future.result()
except TimeoutError:
    print("Timeout")

Пример:

def call_with_timeout(func, *args, timeout=3):
    pool = pebble.ProcessPool(max_workers=1)
    with pool:
        future = pool.schedule(func, args=args, timeout=timeout)
        return future.result()
0 голосов
/ 02 января 2019

Тайм-аут ведет себя как следует.future.result(timeout=timeout) останавливается после заданного времени ожидания. Завершение работы пул все еще ожидает завершения всех ожидающих фьючерсов, что приводит к неожиданной задержке.

Вы можете отключить фоновый режим, вызвав shutdown(wait=False), но в целомПрограмма Python не завершится, пока не завершится выполнение всех ожидающих фьючерсов:

def call_with_timeout(func, *args, timeout=3):
    pool = ProcessPoolExecutor(max_workers=1)
    try:
        future = pool.submit(func, *args)
        result = future.result(timeout=timeout)
    finally:
        pool.shutdown(wait=False)

Executor API не предлагает способа отменить вызов, который уже выполняется.future.cancel() может отменить только те звонки, которые еще не начались.Если вам нужна функция прерывания прерывания, вам, вероятно, следует использовать что-то отличное от concurrent.futures.ProcessPoolExecutor.

...