сохранить результаты ThreadPoolExecutor - PullRequest
0 голосов
/ 29 августа 2018

Я довольно новичок в параллельной обработке с "concurrent.futures" и тестирую несколько простых экспериментов. Код, который я написал, кажется, работает, но я не уверен, как сохранить результаты. Я попытался создать список («фьючерсы») и добавить к нему результаты, но это значительно замедляет процедуру. Мне интересно, есть ли лучший способ сделать это. Спасибо.

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab={}
for i in range(100):
    for j in range(100):
       dtab[i,j]=i+j/2
       couple_ods.append((i,j))

avg_speed=100
def task(i):
    origin=i[0]
    destination=i[1]
    time.sleep(0.01)
    distance=dtab[origin,destination]/avg_speed
    return distance
start1=time.time()
def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
       for number in couple_ods:
          future=executor.submit(task,number)
          futures.append(future.result())

if __name__ == '__main__':
    main()
end1=time.time()

1 Ответ

0 голосов
/ 29 августа 2018

Когда вы вызываете future.result(), он блокируется, пока значение не будет готово. Таким образом, вы не получаете никаких преимуществ от параллелизма - вы запускаете одну задачу, ждете, пока она закончится, запускаете другую, ждете, пока она закончится, и так далее.

Конечно, ваш пример не принесет пользы от потоков. Ваши задачи не делают ничего, кроме вычисления на Python с привязкой к процессору, что означает, что (по крайней мере, в CPython, MicroPython и PyPy, которые являются единственными полными реализациями, которые поставляются с concurrent.futures), GIL (Global Interpreter Lock) предотвратит больше чем одна из ваших тем от прогресса за один раз.

Надеюсь, ваша настоящая программа отличается. Если он выполняет операции, связанные с вводом / выводом (создание сетевых запросов, чтение файлов и т. Д.), Или использует библиотеку расширений, такую ​​как NumPy, которая высвобождает GIL при интенсивной работе ЦП, то он будет работать нормально. Но в противном случае вы захотите использовать ProcessPoolExecutor здесь.


В любом случае, вы хотите добавить future в список, так что вы получите список всех фьючерсов, прежде чем ждать любого из них:

for number in couple_ods:
    future=executor.submit(task,number)
    futures.append(future)

И затем, после того как вы запустили все задания, вы можете начать их ждать. Есть три простых варианта, и один сложный, когда вам нужно больше контроля.


(1) Вы можете просто зациклить их, чтобы дождаться их в том порядке, в котором они были отправлены:

for future in futures:
    result = future.result()
    dostuff(result)

(2) Если вам нужно подождать, пока все они будут закончены, прежде чем выполнять какую-либо работу, вы можете просто позвонить wait:

futures, _ = concurrent.futures.wait(futures)
for future in futures:
    result = future.result()
    dostuff(result)

(3) Если вы хотите обработать каждый из них, как только он будет готов, даже если они вышли из строя, используйте as_completed:

for result in concurrent.futures.as_completed(futures):
    dostuff(result)

Обратите внимание, что примеры, использующие эту функцию в документах, предоставляют некоторый способ определить, какая задача завершена. Если вам это нужно, это может быть так же просто, как передать каждому индекс, затем return index, real_result, а затем вы можете for index, result in … для цикла.

(4) Если вам нужно больше контроля, вы можете зациклить wait на том, что уже сделано:

while futures:
    done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
    for future in done:
        result = future.result()
        dostuff(result)

Этот пример делает то же самое, что и as_completed, но вы можете написать на нем небольшие вариации, чтобы делать разные вещи, например, ждать, пока все будет сделано, но отменять рано, если что-то вызывает исключение.


Для многих простых случаев вы можете просто использовать метод map исполнителя, чтобы упростить первый вариант. Это работает так же, как встроенная функция map, вызывая функцию один раз для каждого значения в аргументе, а затем давая вам что-то, что вы можете зациклить, чтобы получить результаты в том же порядке, но это происходит параллельно. Итак:

for result in executor.map(task, couple_ods):
    dostuff(result)
...