Python concurrent.futures запускает потоки в пуле, пока не будет найден результат - PullRequest
0 голосов
/ 03 июля 2018

Я хочу запускать потоки в ThreadPoolExecutor, пока один из них не даст мне конкретный результат. На данный момент мой код выглядит так:

pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
futures = [pool.submit(g,i) for i in range (4)]
j = 4
for f in concurrent.futures.as_completed(futures):
    if f.result():
        break # OK the result is found
    else:
        j += 1
        futures.append(pool.submit(g,j))

но append в последней строке, похоже, не влияет на генератор as_completed. Есть ли способ добиться этого?

1 Ответ

0 голосов
/ 03 июля 2018

Вы можете просто продолжать проверять каждое будущее в последовательности, пока не найдете то, что готово.

from collections import deque
pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)

futures = deque(pool.submit(g,i) for i in range (4))
j = 4
result = False
while not result:
    while not futures[0].done():
        futures.rotate()
    future = futures.popleft()
    result = future.result()
    if not result:
        j += 1
        futures.append(pool.submit(g,j))

Вот аналогичное решение с использованием concurrent.futures.wait.
Сначала вызывается для тестирования:

import random
class F:
    def __init__(self, threshold=.03):
        self._bool = random.random() < threshold
    def __call__(self, n):
        self.n = n
        time.sleep(random.random())
        return self
    def __bool__(self):
        return self._bool
    def __str__(self):
        return f'I am number {self.n}'
    def __repr__(self):
        return f'I am number {self.n}'

Решение

pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
j = 4
futures = [pool.submit(F(),i) for i in range(j)]
result = False
while not result:
    #print(f'there are {len(futures)} futures')
    maybe_futures = concurrent.futures.wait(futures, return_when='FIRST_COMPLETED')
    futures = maybe_futures.not_done
    # more than one may have completed(?)
    for future in maybe_futures.done:
        temp = future.result()
        if not temp:
            j += 1
            futures.add(pool.submit(F(),j))
        else:
            result = temp
            break

Другое решение, использующее обратный вызов (использует вызываемый выше ): не уверен, что произойдет, если будущее завершится до того, как будет добавлен обратный вызов.

completed_futures = collections.deque()
result = False
def callback(future, completed=completed_futures):
    completed.append(future)

j = 4

with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as pool:
    #initial tasks
    for i in range(j):
        future = pool.submit(F(),i)
        future.add_done_callback(callback)
    while not result:    # is this the same as - while True: ?
        while not completed_futures:
            pass
        while completed_futures:
            future = completed_futures.popleft()
            result = future.result()
            if result:
                break
            j += 1
            future = pool.submit(F(),j)
            future.add_done_callback(callback)
print(result)
...