Многопроцессорная обработка Python pool.map не работает параллельно - PullRequest
0 голосов
/ 07 ноября 2019

Я написал простую параллельную программу на Python

import multiprocessing as mp
import time

def test_function(i):
    print("function starts" + str(i))
    time.sleep(1)
    print("function ends" + str(i))

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count())
    pool.map(test_function, [i for i in range(4)])
    pool.close()
    pool.join()

Что я ожидаю увидеть в выводе:

function starts0
function starts2
function starts1
function starts3
function ends1
function ends3
function ends2
function ends0

Что я на самом деле вижу:

function starts1
function ends1
function starts3
function ends3
function starts2
function ends2
function starts0
function ends0

Когда я смотрю в выводе, это похоже на pool.map запуск функции и ожидание, пока она не будет выполнена, а затем запуск другой, но когда я вычисляю продолжительность всей программы, она составляет около 2 секунд, и это невозможно, если test_function непараллельная работа


Редактировать:

Этот код хорошо работает в MacOS и Linux, но он не показывает ожидаемый результат в windows 10 . версия Python 3.6.4

Ответы [ 2 ]

1 голос
/ 07 ноября 2019

Документация multiprocessing.Pool() (с тех пор Py27, включая ) ясно в намеренно блокирует при обработке очереди. of-звонков, созданных с помощью сгенерированного итератором набора только -4-вызовов, произведенных последовательно из приведенного выше примера.

Документация multiprocessing -модуль говорит об этомPool.map() метод:

map(func, iterable[, chunksize])

Параллельный эквивалент встроенной функции map() (хотя она поддерживает только один итеративный аргумент), Блокируется до тех пор, пока результат не будет готов.

Это должно быть наблюдаемое поведение, в то время как различные методы создания экземпляров будут иметь разные накладные расходы (связанные с копированием процесса).

В любом случае, mp.cpu_count() нужно , а не - это количество ядер ЦП, которые будут выполнены для выполнения таких рабочих задач экземпляра .Pool(), из-за O / S(политики ограничений, связанные с пользователем / процессом) настройки соответствия:

Ваш код должен будет "подчиняться" подмножеству тех ядер ЦП, которые разрешено использовать любым таким multiprocessing -запрошенный подпроцесс,
, число которых не превышает: len( os.sched_getaffinity( 0 ) )


Лучший следующий шаг: переоценить весь ваш код выполнения кода. system

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # /13000484/mnogoprotsessornaya-obrabotka-python-pool-map-ne-rabotaet-parallelno

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
           "invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function, [i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )

может выглядеть примерно так на Linux-классе O / S:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:

Проверьте скрытые детали - что ваш O/ S использует для вызова test_function() - mapstar() (не бЭто был универсальный выбор) было выбрано локальным O / S класса SMP-linux для его метода создания экземпляра подпроцесса по умолчанию, выполняемого с помощью 'fork'.

0 голосов
/ 07 ноября 2019

Я подозреваю, что вы можете столкнуться с общей ошибкой в ​​многопроцессорной обработке:

Печать в общие журналы / экран из нескольких потоков (или процессов) выполнения (одновременно) может привести к сбивающим с толку результатам!

Это также объясняет, почему вы видите различное поведение в зависимости от ОС. Разные ОС решат это немного по-разному. Базовая схема буферизации, управление доступом и т. Д. Будут иметь значение.

Возможно, вы получаете ожидаемую многопроцессорность, но ваша распечатка может вводить в заблуждение.

Iзнаю, что вы предоставили этот код в качестве примера, чтобы продемонстрировать реальную проблему. Итак, просто вернитесь к исходному коду и снова рассмотрите вышеупомянутый часто пропускаемый факт: печать (или запись в файл) обращается к общему ресурсу. Вам может понадобиться блокировка или организация очередей или другие методы. Не зная деталей вашей настоящей проблемы, больше ничего нельзя предложить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...