Проблема в apply_asyn c в многопроцессорном пуле - PullRequest
1 голос
/ 13 января 2020

Я использую пул многопроцессорной обработки в Python и метод .apply_async() для одновременной работы нескольких рабочих.

Но существует проблема из-за использования with вместо создания произвольного экземпляра.

Вот что я сделал до сих пор:


Фрагмент общего кода секции:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

def worker(x):
    print(f"{x} started.")
    sleep(x)
    print(f"{x} finished.")
    return f"{x} finished."

result_list = []
def log_result(result):
    result_list.append(result)

Первый фрагмент кода, который хорошо работает через Python 2 способом:

tick = time()

pool = Pool()
for i in range(6):
    pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

Out:

1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5

Второй фрагмент кода, который хорошо работает через Python 3 способ:

tick = time()

with ProcessPoolExecutor() as executor:
    for i in range(6):
        executor.submit(worker, i)

print('Total elapsed time: ', time() - tick)
print(i)  # Indicates that all iteration has been done.

Выход:

0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.017550945281982
5

Дополнительно:

  • Может быть получено заключение, что Python 3 способ быстрее, чем Python 2 метод.

Вопрос:

Теперь проблема в том, что я хочу реализовать Python 2 с использованием with, например, Python 3 , но задачи не выполнены:

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

Out:

Total elapsed time:  0.10628008842468262
[]
5

Однако, если я поставил sleep(1) после pool.apply_async(...) какой-то л Ите задачи будут завершены (создание блока):

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)
        sleep(1)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

Out:

0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time:  6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5

Что я пропустил?

1 Ответ

1 голос
/ 13 января 2020

concurrent.futures.Executor и multiprocessing.Pool имеют две совершенно разные реализации диспетчера контекста реализации.

concurrent.futures.Executor вызывает shutdown(wait=True), эффективно ожидающих завершения всех поставленных в очередь заданий до sh согласно документация .

Вы можете избежать явного вызова этого метода, если используете оператор with, который отключит Executor (ожидая, как если бы Executor.shutdown () был вызван с параметром wait, установленным в True)

multiprocessing.Pool вызывает terminate вместо close, а затем join, что приводит к преждевременному прерыванию всех текущих заданий. В документации .

объекты пула теперь поддерживают протокол управления контекстом - см. Типы диспетчера контекста. enter () возвращает объект пула, а exit () вызывает terminate ().

Если вы хотите использовать multiprocessing.Pool вместе с его контекстом менеджер, вам нужно дождаться результатов самостоятельно.

with Pool() as pool:
    async_result = pool.apply_async(worker, args=(i,), callback=log_result)
    async_result.wait()
...