Вот способ, который использует более простой код установки, без threading.Event
, так как это не кажется необходимым для решения проблемы. По сути, вы можете создать future_b
как новый Future()
самостоятельно и использовать метод add_done_callback
для future_a
, чтобы установить результат future_b
. Здесь func_a
- это вычисление для вычисления результата future_a
, а func_b
- это вычисление для вычисления результата future_b
с использованием результата future_a
.
from concurrent.futures import ProcessPoolExecutor, Future
def func_a(x):
return 2 + x
def func_b(x):
return 10 * x
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
Если вам нужна вспомогательная функция для этого, вы можете написать одну: map_future
принимает будущее и функцию отображения и возвращает новое отображенное будущее по мере необходимости. Эта версия обрабатывает исключение в случае, если f.result()
или func_b
выдает одно:
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
Предостережения: это противоречит рекомендациям в документации для класса Future
, которая гласит:
Future
экземпляры создаются Executor.submit()
и не должны создаваться напрямую, кроме как для тестирования.
Кроме того, если у вас есть ошибки, которые не являются подклассами Exception
в обратном вызове они будут «зарегистрированы и проигнорированы» согласно документам. Для простоты я решил поймать Exception
в этом коде, но вы можете предпочесть sys.exc_info()[0]
способ перехвата всех возможных вещей, которые могут быть подняты.