Неверное количество выполняющихся вызовов с ProcessPoolExecutor в Python - PullRequest
3 голосов
/ 13 июня 2019

В стандартном модуле Python concurrent.futures почему число выполняющихся вызовов в ProcessPoolExecutor равно max_workers + 1 вместо max_workers, как в ThreadPoolExecutor?Это происходит только тогда, когда количество отправленных вызовов строго превышает количество рабочих процессов пула.

Следующий фрагмент кода Python, который отправляет 8 вызовов двум работникам в ProcessPoolExecutor:

import concurrent.futures
import time


def call():
    while True:
        time.sleep(1)


if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(call) for _ in range(8)]
        time.sleep(5)

        for future in futures:
            print(future.running())

печатает это (3 текущих вызова; неожиданно, так как есть 2 рабочих):

Верно
Верно
Верно
Ложь
Ложь
Ложь
False
False

при использовании ThreadPoolExecutor распечатывает это (2 текущих вызова; ожидается):

True
True
False
Ложь
Ложь
Ложь
Ложь
Ложь

1 Ответ

1 голос
/ 13 июня 2019

Ну, я бы не стал слишком доверять этому running() методу. Кажется, что это не отражает действительное состояние бега.

Лучший способ удостовериться в состояниях процесса - заставить их что-то напечатать / обновить. Я решил создать общий словарь, используя объект multiprocessing.Manager().dict().

Этот объект, синхронизированный с процессом, можно безопасно просматривать и обновлять из любого процесса, и он имеет общее состояние даже в многопроцессорной среде.

Каждый раз, когда начинается процесс, обновляйте общий dict с помощью PID в качестве ключа и True в качестве значения. Установите False на выходе.

import concurrent.futures
import multiprocessing
import time,os


def call(shared_dict):
    shared_dict[os.getpid()] = True
    print("start",shared_dict)
    time.sleep(10)
    shared_dict[os.getpid()] = False
    print("end",shared_dict)


if __name__ == "__main__":

    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        shared_dict = multiprocessing.Manager().dict()
        futures = [executor.submit(call,shared_dict) for _ in range(8)]
        time.sleep(5)
        for future in futures:
            print(future.running())

вот вывод, который я получаю:

start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}

Как видите, у меня 5 запущенных процессов. В то время как мой словарь ясно показывает, что

  • одновременно выполняется не более 2 процессов
  • процессы создаются в начале только один раз, а затем повторно используются для выполнения дальнейших вызовов (в конце концов, это пул)

Давайте проверим очень минималистическую документацию :

running () Возвращает True, если вызов выполняется в данный момент и не может быть отменен.

Похоже, что он отражает состояние, связанное с возможностью отмены будущего выполнения объекта Future (поскольку он еще не был должным образом инициализирован / не подключен к очереди связи, и еще не пришло время его отменять), а скорее является фактическим " запущен "статус самого процесса.

Это, вероятно, то, что этот комментарий в исходном коде означает ниже set_running_or_notify_cancel определение:

Пометить будущее как работающее или обработать любые уведомления об отмене.

Если будущее было отменено (метод cancel () был вызван и возвращено значение True), то все потоки, ожидающие завершения в будущем (хотя вызовы as_completed () или wait ()) уведомляются, и возвращается значение False.

Если будущее не было отменено, оно переводится в состояние выполнения (будущие вызовы running () возвращают True) и возвращается True.

Еще раз, мы узнаем, что лучше попросить подпроцессы сотрудничать, публикуя их статус, а не пытаться вымогать его нечетко документированными методами.

...