Как обеспечить тайм-аут на каждое будущее в итераторе concurrent.futures? - PullRequest
0 голосов
/ 31 марта 2020

Документация относительно времени ожидания для concurrent.futures очень сложна для понимания. В простом случае я хотел бы использовать ProcessPoolExecutor, вызывая .submit в al oop, который просматривает список рабочих функций. Я хочу, чтобы каждый из этих Future объектов имел связанный тайм-аут в 10 минут, но в противном случае он завершится асинхронно.

Мой первый подход состоял в том, чтобы попытаться использовать функцию as_completed, которая создает итератор объектов Future и выдает следующий после завершения. as_completed принимает параметр timeout, но в документации сказано, что это время ожидания относится к первому моменту, когда вызывается as_completed, и не обязательно к времени жизни любого объекта Future.

Например, предположим, что ProcessPoolExecutor имеет только 3 рабочих процесса, но список объектов Future содержит 10 элементов. 7 из элементов могут находиться в необработанном состоянии до 10 минут, пока обрабатываются первые 3 элемента. Вскоре после этого тайм-аут от as_completed будет отключен, что приведет к сбою, даже если каждый индивидуум Future, возможно, сам достиг 10-минутного ограничения.

Обратите внимание, что те же ограничения, которые применяются для as_completed также будет применяться для wait, а wait труднее использовать для этого варианта использования из-за ограниченных опций возврата, которые он поддерживает.

Моя следующая мысль заключалась в использовании timeout параметр, который future.result позволяет и вызов f.result(timeout=600) для каждого f (Future) в моем списке фьючерсов. Однако на самом деле нет способа установить этот тайм-аут, не требуя на самом деле блокирующего результата. Если вы перебираете список фьючерсов и вызываете f.result(...), этот вызов блокируется в течение указанного времени ожидания.

С другой стороны, вы также не можете комбинировать f.result с as_completed либо в наивном, но на первый взгляд правильный путь, такой как

[f.result(timeout=600) for f in as_completed(futures_list)]

... потому что итерация as_completed обманчиво ожидает асинхронно по завершении фьючерса и возвращает только то, что .result вызвано после того, как они уже завершено.

Исходя из этого, каков правильный шаблон для создания списка Future с, каждый из которых имеет свой индивидуальный тайм-аут, а затем ожидает их асинхронно до окончания sh?

1 Ответ

0 голосов
/ 31 марта 2020

Похоже, что в этом виде асинхронного контекста невозможно указать время ожидания на будущее. Доступные функции API wait и as_completed идут по более простому пути, поддерживая глобальное время ожидания для всех задач в итерируемом из Future объектов и не пытаясь измерить время, с которого Future впервые начинает активно работать в состоянии обработки.

Я выбрал обходной путь, заключающийся в разделении моего списка задач на набор фрагментов и использовании as_completed для каждого фрагмента. Размер чанка устанавливается равным количеству рабочих, которое настроено для использования моего ProcessPoolExecutor, так что я могу быть несколько уверен, что «глобальное» время ожидания as_completed тайно функционирует как время ожидания на будущее, так как все задачи активно обрабатываются сразу. Недостатком является несколько меньшее использование, потому что пул процессов не может захватить следующую задачу Future, когда задачи заканчиваются sh рано; он должен ждать всю следующую серию задач. Для меня это нормально, но это значительный недостаток удобства использования concurrent.futures, который я должен выбрать.

Вот пример кода. Предположим, что my_task_list уже содержит функции с некоторыми или всеми необходимыми аргументами, связанными с помощью functools.partial или другими способами. Вы можете изменить это так, чтобы аргументы передавались в отдельной итерируемой кортеже или диктовке и передавались в submit по мере необходимости.

my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
    all_results = []
    for chunk_start in range(0, len(my_task_list), num_workers):
        chunk = my_task_list[chunk_start:chunk_start + num_workers]
        # could extract parameters to pass for this task chunk here.
        futures = [pool.submit(task) for task in chunk]
        all_results += [
            f.result() for f in as_completed(futures, timeout=my_timeout)
        ]
    return all_results

Обратите внимание, что если вы выберете num_workers больше, чем число доступных процессоров до ProcessPoolExecutor вы получите больше задач, чем процессоров в данном чанке, и вернетесь к ситуации, когда время ожидания as_completed не будет корректно применяться к времени выполнения каждой задачи, что, вероятно, приведет к тому же виду ошибок тайм-аута, как будто просто используя as_completed или wait в общем списке задач без чанкинга.

...