Когда вы вызываете future.result()
, он блокируется, пока значение не будет готово. Таким образом, вы не получаете никаких преимуществ от параллелизма - вы запускаете одну задачу, ждете, пока она закончится, запускаете другую, ждете, пока она закончится, и так далее.
Конечно, ваш пример не принесет пользы от потоков. Ваши задачи не делают ничего, кроме вычисления на Python с привязкой к процессору, что означает, что (по крайней мере, в CPython, MicroPython и PyPy, которые являются единственными полными реализациями, которые поставляются с concurrent.futures
), GIL (Global Interpreter Lock) предотвратит больше чем одна из ваших тем от прогресса за один раз.
Надеюсь, ваша настоящая программа отличается. Если он выполняет операции, связанные с вводом / выводом (создание сетевых запросов, чтение файлов и т. Д.), Или использует библиотеку расширений, такую как NumPy, которая высвобождает GIL при интенсивной работе ЦП, то он будет работать нормально. Но в противном случае вы захотите использовать ProcessPoolExecutor
здесь.
В любом случае, вы хотите добавить future
в список, так что вы получите список всех фьючерсов, прежде чем ждать любого из них:
for number in couple_ods:
future=executor.submit(task,number)
futures.append(future)
И затем, после того как вы запустили все задания, вы можете начать их ждать. Есть три простых варианта, и один сложный, когда вам нужно больше контроля.
(1) Вы можете просто зациклить их, чтобы дождаться их в том порядке, в котором они были отправлены:
for future in futures:
result = future.result()
dostuff(result)
(2) Если вам нужно подождать, пока все они будут закончены, прежде чем выполнять какую-либо работу, вы можете просто позвонить wait
:
futures, _ = concurrent.futures.wait(futures)
for future in futures:
result = future.result()
dostuff(result)
(3) Если вы хотите обработать каждый из них, как только он будет готов, даже если они вышли из строя, используйте as_completed
:
for result in concurrent.futures.as_completed(futures):
dostuff(result)
Обратите внимание, что примеры, использующие эту функцию в документах, предоставляют некоторый способ определить, какая задача завершена. Если вам это нужно, это может быть так же просто, как передать каждому индекс, затем return index, real_result
, а затем вы можете for index, result in …
для цикла.
(4) Если вам нужно больше контроля, вы можете зациклить wait
на том, что уже сделано:
while futures:
done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
for future in done:
result = future.result()
dostuff(result)
Этот пример делает то же самое, что и as_completed
, но вы можете написать на нем небольшие вариации, чтобы делать разные вещи, например, ждать, пока все будет сделано, но отменять рано, если что-то вызывает исключение.
Для многих простых случаев вы можете просто использовать метод map
исполнителя, чтобы упростить первый вариант. Это работает так же, как встроенная функция map
, вызывая функцию один раз для каждого значения в аргументе, а затем давая вам что-то, что вы можете зациклить, чтобы получить результаты в том же порядке, но это происходит параллельно. Итак:
for result in executor.map(task, couple_ods):
dostuff(result)