Завершите работу ThreadPoolExecutor до завершения всех задач - PullRequest
1 голос
/ 16 июня 2020

Я пытаюсь перебрать большой список строк (примерно 80 000), сгенерировать HTTP-запрос и обработать ответы, пока не получу правильный. Я передаю список и функцию, которая выполняет запрос функции ThreadPoolExecutor.map(), а затем просматриваю результаты по мере их поступления.

После получения правильного ответа от HTTP-сервера я хочу отменить все оставшиеся фьючерсы и закройте скрипт. Я бы предпочел, для простоты программирования, не отслеживать каждое будущее самому.

Я пробовал использовать shutdown(), но независимо от того, укажу я или нет ждать, сценарий все равно не закончится, пока все фьючерсы в очереди заполнены. В результате, если строка 2000 моего списка является правильным значением, мне все равно придется ждать завершения следующих 78000 фьючерсов, что может занять значительное время.

Есть ли способ сообщить ThreadPoolExecutor, что остальные задачи не нужны и их даже не нужно запускать?

Ответы [ 3 ]

1 голос
/ 16 июня 2020

Можно cancel() фьючерс. Я вижу, что вы указали в вопросе, что вам не нравится отслеживать фьючерсы, но это может быть ваш лучший выбор, и это не кажется таким сложным:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def process(duration):
    print(f"processing with duration {duration}")
    time.sleep(duration)
    if duration == 3:
        return "result found"

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(process, i) for i in range(80000)]
    for future in as_completed(futures):
        if future.result() == "result found":
            executor.shutdown(wait=False)
            print("shutdown")
            for f in futures:
                if not f.done():
                    f.cancel()
            break
print("about to exit")
1 голос
/ 16 июня 2020

Если вы можете переключиться на Python 3.9, у него есть эта функция встроенная в метод выключения :

Если cancel_futures имеет значение True, этот метод отменит все отложенные фьючерсы, которые исполнитель еще не запустил. Любые завершенные или запущенные фьючерсы не будут отменены, независимо от значения cancel_futures.

0 голосов
/ 16 июня 2020

Вы можете использовать объект Event для управления выполнением функции transform .

import concurrent.futures
import threading

evt = threading.Event()
def transform(arg):
  if evt.is_set():
    return None

  ...
  return ret

with concurrent.futures.ThreadPoolExecutor() as pool:
  for result in pool.map(transform, data):
    if result == ...:
      evt.set()
      break

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...