Игнорирование неиспользованных фьючерсов с ThreadPoolExecutor - PullRequest
0 голосов
/ 02 января 2019

Я запускаю две функции, fast() и slow(), параллельно, используя ThreadPoolExecutor.Если fast() возвращает результат не None, я бы хотел использовать его, в противном случае используйте результат slow().Вот пример:

from concurrent.futures import ThreadPoolExecutor
from time import sleep

def fast():
    sleep(2)
    return 'fast'

def slow():
    sleep(4)
    return 'slow'

def run_parallel():
    with ThreadPoolExecutor() as executor:
        fast_future = executor.submit(fast)
        slow_future = executor.submit(slow)

        fast_result = fast_future.result()
        if fast_result is not None:
            slow_future.cancel()
            return fast_result

        return slow_future.result()

print(run_parallel())

Запуск с выводом:

$ time python example.py 
fast

real    0m4.058s
user    0m0.041s
sys 0m0.011s

Так как fast() вернул не-None значение, я ожидал, что это займет 2 с вместо 4 с,тем более, что у меня есть эта строка для slow_future.cancel().

Мой идеальный синтаксис для этого будет выглядеть примерно так:

combined_future = fast_future.orElse(slow_future)
return combined_future.result()

Что я могу сделать, чтобы получить это ожидаемое поведение?

1 Ответ

0 голосов
/ 14 июня 2019

Это связано с тем, что медленное будущее не может быть отменено (slow_future.cancel() возвращает False), поэтому исполнитель пула потоков ожидает поток.Попробуйте что-то вроде:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

abort_flag = False


def fast():
    for i in range(20):
        if abort_flag:
            return None
        sleep(.1)
    return 'fast'


def slow():
    for i in range(40):
        if abort_flag:
            return None
        sleep(.1)
    return 'slow'


def run_parallel():
    global abort_flag
    with ThreadPoolExecutor() as executor:
        abort_flag = False
        fast_future = executor.submit(fast)
        slow_future = executor.submit(slow)
        for f in as_completed((fast_future, slow_future)):
            result = f.result()
            if result is not None:
                abort_flag = True
                return result


print(run_parallel())
...