Как функционально составить фьючерс? - PullRequest
0 голосов
/ 09 января 2020

У меня есть объект потока, который я не могу распределить по ProcessPoolExecutor, но хотел бы вернуть будущее. Если у меня уже есть будущее, есть ли способ применить его к завершенному значению, например, Future a -> (a -> b) -> Future b?

import concurrent.futures
import threading

def three(x):
    return 2+x


if __name__ == '__main__':
    trackedItem = (3, threading.Event())
    pool = concurrent.futures.ProcessPoolExecutor(3)
    poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
    *** something magic goes here ***
    #Trying to transform it into Future(int,Event)

1 Ответ

1 голос
/ 17 января 2020

Вот способ, который использует более простой код установки, без threading.Event, так как это не кажется необходимым для решения проблемы. По сути, вы можете создать future_b как новый Future() самостоятельно и использовать метод add_done_callback для future_a, чтобы установить результат future_b. Здесь func_a - это вычисление для вычисления результата future_a, а func_b - это вычисление для вычисления результата future_b с использованием результата future_a.

from concurrent.futures import ProcessPoolExecutor, Future

def func_a(x):
    return 2 + x

def func_b(x):
    return 10 * x

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result()) # 50

Если вам нужна вспомогательная функция для этого, вы можете написать одну: map_future принимает будущее и функцию отображения и возвращает новое отображенное будущее по мере необходимости. Эта версия обрабатывает исключение в случае, если f.result() или func_b выдает одно:

def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:
            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b

Предостережения: это противоречит рекомендациям в документации для класса Future, которая гласит:

Future экземпляры создаются Executor.submit() и не должны создаваться напрямую, кроме как для тестирования.

Кроме того, если у вас есть ошибки, которые не являются подклассами Exception в обратном вызове они будут «зарегистрированы и проигнорированы» согласно документам. Для простоты я решил поймать Exception в этом коде, но вы можете предпочесть sys.exc_info()[0] способ перехвата всех возможных вещей, которые могут быть подняты.

...