Синхронизация многопроцессорного скрипта - PullRequest
0 голосов
/ 04 апреля 2019

Я столкнулся со странной проблемой синхронизации при использовании многопроцессорного модуля.

Рассмотрим следующий сценарий.У меня есть такие функции:

import multiprocessing as mp

def workerfunc(x):
    # timehook 3
    # something with x
    # timehook 4

def outer():

    # do something

    mygen = ... (some generator expression)

    pool = mp.Pool(processes=8)

    # time hook 1
    result = [pool.apply(workerfunc, args=(x,)) for x in mygen]
    # time hook 2

if __name__ == '__main__':
    outer()

Я использую модуль времени, чтобы получить произвольное представление о том, как долго выполняются мои функции.Я успешно создал 8 отдельных процессов, которые завершаются без ошибок.Самое длительное время для завершения работника составляет около 130 мс (измеряется между временными крюками 3 и 4).

Я ожидал (поскольку они работают параллельно), что время между крючками 1 и 2 будет примерно одинаковым,Удивительно, но в результате я получаю 600 мс.

Моя машина имеет 32 ядра и должна легко справляться с этим.Кто-нибудь может подсказать, откуда взялась эта разница во времени?

Спасибо!

Ответы [ 2 ]

1 голос
/ 04 апреля 2019

Вы используете pool.apply, который блокирует.Вместо этого используйте pool.apply_async, и тогда все вызовы функций будут выполняться параллельно, и каждый из них немедленно возвратит объект AsyncResult.Вы можете использовать этот объект, чтобы проверить, когда процессы завершены, а затем получить результаты, используя этот объект.

1 голос
/ 04 апреля 2019

Поскольку вы используете многопроцессорность, а не многопоточность, ваша проблема с производительностью не связана с GIL (глобальная блокировка интерпретатора Python).

Я нашел интересную ссылку, объясняющую это на примере, вы можете найти ее внижняя часть этого ответа.

GIL не препятствует запуску процесса на другом процессоре компьютера.Это просто позволяет только одному потоку запускаться одновременно в интерпретаторе.

Таким образом, многопроцессорная обработка, а не многопоточность, позволит вам достичь истинного параллелизма.

Позволяет понять это через некоторый сравнительный анализ, потому что только это приведетвам верить тому, что сказано выше.И да, это должен быть способ учиться - испытать это, а не просто прочитать или понять это.Потому что, если вы что-то испытали, никакие аргументы не могут убедить вас в противоположных мыслях.

import random
from threading import Thread
from multiprocessing import Process
size = 10000000   # Number of random numbers to add to list
threads = 2 # Number of threads to create
my_list = []
for i in xrange(0,threads):
    my_list.append([])
def func(count, mylist):
    for i in range(count):
        mylist.append(random.random())
def multithreaded():
    jobs = []
    for i in xrange(0, threads):
        thread = Thread(target=func,args=(size,my_list[i]))
        jobs.append(thread)
    # Start the threads
    for j in jobs:
        j.start() 
    # Ensure all of the threads have finished
    for j in jobs:
        j.join()

def simple():
    for i in xrange(0, threads):
        func(size,my_list[i])

def multiprocessed():
    processes = []
    for i in xrange(0, threads):
        p = Process(target=func,args=(size,my_list[i]))
        processes.append(p)
    # Start the processes
    for p in processes:
        p.start()
    # Ensure all processes have finished execution
    for p in processes:
        p.join()
if __name__ == "__main__":
    multithreaded()
    #simple()
    #multiprocessed()

Дополнительная информация

Здесь Вы можете найти источник этой информации и более подробное техническое объяснение (бонус: там также есть цитаты Гвидо Ван Россума :))

...