Почему скрипт многопроцессорной обработки python через некоторое время замедляется? - PullRequest
0 голосов
/ 13 июля 2020

Я прочитал старый вопрос Почему этот python скрипт многопроцессорной обработки замедляется через некоторое время? и многие другие, прежде чем опубликовать этот. Они не отвечают на мою проблему.

ИДЕЯ СЦЕНАРИЯ. Скрипт генерирует массивы размером 256x256 в сериализованном формате l oop. Элементы массива вычисляются один за другим из списка, который содержит словари с соответствующими параметрами, по одному словарю на элемент массива (всего 256x256 на список). Список - это способ включить параллельные вычисления.

ПРОБЛЕМА. Вначале генерация данных ускоряется с десятка секунд до нескольких секунд. Затем, после нескольких итераций, он начинает замедляться на долю секунды с каждым новым массивом, сгенерированным до такой степени, что для вычисления чего-либо требуется вечность.

Дополнительная информация.

  1. I Я использую функцию pool.map. Внеся несколько небольших изменений, чтобы определить, какой элемент вычисляется, я также попытался использовать map_asyn c. К сожалению, это медленнее, потому что мне нужно запускать пул каждый раз, когда я заканчиваю sh вычисление массива.
  2. При использовании pool.map я инициализирую пул один раз, прежде чем что-нибудь начнется. Таким образом, я надеюсь сэкономить время на инициализацию пула по сравнению с map_asyn c.
  3. ЦП показывает низкое использование, до ~ 18%.
  4. В моем случае жесткий диск не является узким местом . Все данные, необходимые для расчетов, находятся в оперативной памяти. Я также не сохраняю данные на жестком диске, сохраняя все в ОЗУ.
  5. Я также проверил, сохраняется ли проблема, если я использую другое количество ядер, 2-24. Никаких изменений.
  6. Я провел несколько дополнительных тестов, запустив и завершив пул, a. каждый раз, когда создается массив, b. каждые 10 массивов. Я заметил, что в каждом случае выполнение кода замедляется по сравнению с временем выполнения предыдущего пула, т.е. если предыдущий замедлился до 5 секунд, другой будет 5.Xs и так далее. Единственный раз, когда выполнение не замедляется, - это когда я запускаю код последовательно.
  7. Рабочий env: Windows 10, Python 3.7, conda 4.8.2, Spyder 4.

ВОПРОС: Почему многопроцессорность замедляется через некоторое время в случае, когда задействованы только ЦП и ОЗУ (без замедления жесткого диска)? Есть идеи?

ОБНОВЛЕННЫЙ КОД:

import multiprocessing as mp 
from tqdm import tqdm
import numpy as np
import random

def wrapper_(arg):
    return tmp.generate_array_elements(
        self=arg['self'], 
        nu1=arg['nu1'], 
        nu2=arg['nu2'], 
        innt=arg['innt'], 
        nu1exp=arg['nu1exp'], 
        nu2exp=arg['nu2exp'], 
        ii=arg['ii'], 
        jj=arg['jj'],
        llp=arg['self'].llp, 
        rr=arg['self'].rr, 
    )

class tmp:
    def __init__(self, multiprocessing, length, n_of_arrays):
        self.multiprocessing = multiprocessing
        self.inshape = (length,length)
        self.length = length
        self.ll_len = n_of_arrays
        self.num_cpus = 8
        self.maxtasksperchild = 10000
        self.rr = 0
        
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_ll(self, lp): 
        return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
    
    """original function is different, modified to return something"""
    def get_ip(self): return np.random.random()
    
    """original function is different, modified to return something"""
    def get_op(self): return np.random.random(self.length)
    
    """original function is different, modified to return something"""    
    def get_innt(self, nu1, nu2, ip):
        return nu1*nu2/ip
    
    """original function is different, modified to return something"""    
    def __get_pp(self, nu1):
        return np.exp(nu1)
    
    """dummy function for the example purpose"""
    def dummy_function(self):
        """do important stuff"""
        return 
    
    """dummy function for the example purpose"""
    def dummy_function_2(self, result):
        """do important stuff"""
        return np.reshape(result, np.inshape)
    
    """dummy function for the example purpose"""
    def dummy_function_3(self):
        """do important stuff"""
        return
    
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_llp(self, ll, lp):
        return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
        
    """NOTE, lp is not used here for the example purpose but
    in the original code, it's very important variable containg
    relevant data for calculations"""
    def generate(self, lp={}):
        """create a list that is used to the creation of 2-D array"""
        """providing here a dummy pp param to get_ll"""
        ll = self.get_ll(lp)
        ip = self.get_ip()
        
        self.op = self.get_op()
        
        """length of args_tmp = self.length * self.length = 256 * 256"""
        args_tmp = [
            {'self': self, 
             'nu1': nu1,  
             'nu2': nu2, 
             'ii': ii, 
             'jj': jj,
             'innt': np.abs(self.get_innt(nu1, nu2, ip)),
             'nu1exp': np.exp(1j*nu1*ip),
             'nu2exp': np.exp(1j*nu2*ip),
             } for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
        
        pool = {}
        if self.multiprocessing: 
            pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
        
        """number of arrays is equal to len of ll, here 300"""
        for ll_ in tqdm(ll):
            """Generate data"""
            self.__generate(ll_, lp, pool, args_tmp)
        
        """Create a pool of CPU threads"""
        if self.multiprocessing: 
            pool.terminate()

    def __generate(self, ll, lp, pool = {}, args_tmp = []):
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function()
        
        self.llp = self.get_llp(ll, lp)
        """originally the values is taken from lp"""
        self.rr = self.rr
        
        if self.multiprocessing and pool: 
            result = pool.map(wrapper_, args_tmp)
        else: 
            result = [wrapper_(arg) for arg in args_tmp]
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        result = self.dummy_function_2(result)
    
    """original function is different"""
    def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
        if rr == 1 and self.inshape[0] - 1 - jj < ii: 
            return 0
        elif rr == -1 and ii > jj: 
            return 0
        elif rr == 0:
            """do nothing"""
        
        ll1 = []
        ll2 = []
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function_3()
        
        for kk, ll in enumerate(llp):
            ll1.append(
               self.__get_pp(nu1) * 
               nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
            )
            ll2.append(
               self.__get_pp(nu2) * 
               nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
            )
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result
    
    
    
g = tmp(False, 256, 300)
g.generate()

Ответы [ 2 ]

1 голос
/ 14 июля 2020

Трудно сказать, что происходит в вашем алгоритме. Я мало знаю о многопроцессорности, но, вероятно, безопаснее придерживаться функций и не передавать себя в объединенные процессы. Это делается, когда вы передаете args_tmp в wrapper_ в pool.map(). Также в целом постарайтесь уменьшить объем данных, передаваемых между родительским и дочерним процессами в целом. Я пытаюсь переместить генерацию списка lp в рабочие пула, чтобы предотвратить передачу лишних данных.

Наконец, хотя я не думаю, что это имеет значение в этом примере кода, но вы должны очистить его после используя пул или пул с with.

Я переписал часть вашего кода, чтобы попробовать, и это кажется быстрее, но я не на 100%, что он соответствует вашему алгоритму. Некоторые имена переменных трудно различить guish.

Это работает намного быстрее для меня, но трудно сказать, точно ли оно создает ваши решения. Мой окончательный вывод, если это верно, заключается в том, что дополнительная передача данных значительно замедляла рабочих пула.

#main.py
if __name__ == '__main__':
    import os
    import sys
    file_dir = os.path.dirname(__file__)
    sys.path.append(file_dir)

    from tmp import generate_1
    parallel = True
    generate_1(parallel)


#tmp.py
import multiprocessing as mp 
import numpy as np
import random
from tqdm import tqdm
from itertools import starmap

def wrapper_(arg):
    return arg['self'].generate_array_elements(
        nu1=arg['nu1'],
        nu2=arg['nu2'],
        ii=arg['ii'],
        jj=arg['jj'],
        lp=arg['self'].lp,
        nu1exp=arg['nu1exp'],
        nu2exp=arg['nu2exp'],
        innt=arg['innt']
    )

def generate_1(parallel):
    """create a list that is used to the creation of 2-D array"""
    il = np.random.random(256)
    """generating params for parallel data generation"""
    """some params are also calculated here to speed up the calculation process
    because they are always the same so they can be calculated just once"""
    """this code creates a list of 256*256 elements"""
    args_tmp = [
    {
     'nu1': nu1,  
     'nu2': nu2, 
     'ii': ii, 
     'jj': jj,
     'innt': np.random.random()*nu1+np.random.random()*nu2,
     'nu1exp': np.exp(1j*nu1),
     'nu2exp': np.exp(1j*nu2),
    } for ii, nu1 in enumerate(il) for jj, nu2 in enumerate(il)]

    """init pool"""
    

    """get list of arrays to generate"""
    ip_list = [random.sample((range(256)),int(np.random.random()*12)+1) for ii in range(300)]

    map_args = [(idx, ip, args_tmp) for idx, ip in enumerate(ip_list)]
    """separate function to do other important things"""
    if parallel:
        with mp.Pool(8, maxtasksperchild=10000) as pool:
            result = pool.starmap(start_generate_2, map_args)
    else:
        result = starmap(start_generate_2, map_args)
    # Wrap iterator in list call.
    return list(result)

def start_generate_2(idx, ip, args_tmp):
    print ('starting {idx}'.format(idx=idx))
    runner = Runner()
    result = runner.generate_2(ip, args_tmp)
    print ('finished {idx}'.format(idx=idx))
    return result

class Runner():

    def generate_2(self, ip, args_tmp):
        """NOTE, the method is much more extensive and uses other methods of the class""" 
        """so it must remain a method of the class that is not static!"""
        self.lp = [{'a': np.random.random(), 'b': np.random.random()} for ii in ip]
        """this part creates 1-D array of the length of args_tmp, that's 256*256"""
        result = map(wrapper_, [dict(args, self=self) for args in args_tmp])
        """it's then reshaped to 2-D array"""
        result = np.reshape(list(result), (256,256))
        return result
    
    def generate_array_elements(self, nu1, nu2, ii, jj, lp, nu1exp, nu2exp, innt):
        """doing heavy calc"""
        """"here is something else"""
        if ii > jj: return 0
            
        ll1 = []
        ll2 = []
        for kk, ll in enumerate(lp):
            ll1.append(nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random()))
            ll2.append(nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random()))
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result


Я добавляю общий шаблон c, чтобы показать архитектуру, в которой вы отделить подготовку общих аргументов от средства выполнения задач и по-прежнему использовать классы. Стратегия здесь заключалась бы в том, чтобы не создавать слишком много задач (300 кажется быстрее, чем пытаться разделить их до 64000) и не передавать слишком много данных каждой задаче. Интерфейс launch_task должен быть как можно более простым, что в моем рефакторинге вашего кода было бы эквивалентно start_generate_2.

import multiprocessing
from itertools import starmap


class Launcher():
    def __init__(self, parallel):
        self.parallel = parallel

    def generate_shared_args(self):
        return [(i, j) for i, j in enumerate(range(300))]

    def launch(self):
        shared_args = self.generate_shared_args()
        if self.parallel:
            with multiprocessing.Pool(8) as pool:
                result = pool.starmap(launch_task, shared_args)
        else:
            result = starmap(launch_task, shared_args)
        # Wrap in list to resolve iterable.
        return list(result)


def launch_task(i, j):
    task = Task(i, j)
    return task.run()


class Task():

    def __init__(self, i, j):
        self.i = i
        self.j = j

    def run(self):
        return self.i + self.j


if __name__ == '__main__':
    parallel = True
    launcher = Launcher(parallel)
    print(launcher.launch())

Здесь есть предупреждение об очистке пула в документации по пулу. : https://docs.python.org/3/library/multiprocessing.html#multiprocessing .pool.Pool

В первом пункте обсуждается отказ от общего состояния и особенно больших объемов данных. https://docs.python.org/3/library/multiprocessing.html#programming - рекомендации

0 голосов
/ 15 июля 2020

Предложения Яна Уилсона были очень полезны, и одно из них помогло решить проблему. Поэтому его ответ отмечен как правильный.

По его мнению, для меньшего количества задач лучше вызывать пул. Поэтому вместо вызова pool.map для каждого массива (N), который создается 256 256 раз для каждого элемента массива (всего N 256 * 256 задач), теперь я вызываю pool.map для функции, которая вычисляет весь массив всего N раз. Вычисление массива внутри функции выполняется сериализованным способом.

Я все еще отправляю self в качестве параметра, потому что он нужен в функции, но не влияет на производительность.

Это небольшое изменение ускоряет вычисление массива с 7-15 сек до 1.5it / s-2s / it!

ТЕКУЩИЙ КОД:

import multiprocessing as mp 
import tqdm
import numpy as np
import random

def wrapper_(arg):
    return tmp.generate_array_elements(
        self=arg['self'], 
        nu1=arg['nu1'], 
        nu2=arg['nu2'], 
        innt=arg['innt'], 
        nu1exp=arg['nu1exp'], 
        nu2exp=arg['nu2exp'], 
        ii=arg['ii'], 
        jj=arg['jj'],
        llp=arg['self'].llp, 
        rr=arg['self'].rr, 
    )

"""NEW WRAPPER HERE"""
"""Sending self doesn't have bad impact on the performance, at least I don't complain :)"""
def generate(arg):
   tmp._tmp__generate(arg['self'], arg['ll'], arg['lp'], arg['pool'], arg['args_tmp'])

class tmp:
    def __init__(self, multiprocessing, length, n_of_arrays):
        self.multiprocessing = multiprocessing
        self.inshape = (length,length)
        self.length = length
        self.ll_len = n_of_arrays
        self.num_cpus = 8
        self.maxtasksperchild = 10000
        self.rr = 0
        
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_ll(self, lp): 
        return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
    
    """original function is different, modified to return something"""
    def get_ip(self): return np.random.random()
    
    """original function is different, modified to return something"""
    def get_op(self): return np.random.random(self.length)
    
    """original function is different, modified to return something"""    
    def get_innt(self, nu1, nu2, ip):
        return nu1*nu2/ip
    
    """original function is different, modified to return something"""    
    def __get_pp(self, nu1):
        return np.exp(nu1)
    
    """dummy function for the example purpose"""
    def dummy_function(self):
        """do important stuff"""
        return 
    
    """dummy function for the example purpose"""
    def dummy_function_2(self, result):
        """do important stuff"""
        return np.reshape(result, np.inshape)
    
    """dummy function for the example purpose"""
    def dummy_function_3(self):
        """do important stuff"""
        return
    
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_llp(self, ll, lp):
        return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
        
    """NOTE, lp is not used here for the example purpose but
    in the original code, it's very important variable containg
    relevant data for calculations"""
    def generate(self, lp={}):
        """create a list that is used to the creation of 2-D array"""
        """providing here a dummy pp param to get_ll"""
        ll = self.get_ll(lp)
        ip = self.get_ip()
        
        self.op = self.get_op()
        
        """length of args_tmp = self.length * self.length = 256 * 256"""
        args_tmp = [
            {'self': self, 
             'nu1': nu1,  
             'nu2': nu2, 
             'ii': ii, 
             'jj': jj,
             'innt': np.abs(self.get_innt(nu1, nu2, ip)),
             'nu1exp': np.exp(1j*nu1*ip),
             'nu2exp': np.exp(1j*nu2*ip),
             } for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
        
        pool = {}
        
        """MAJOR CHANGE IN THIS PART AND BELOW"""
        map_args = [{'self': self, 'idx': (idx, len(ll)), 'll': ll, 'lp': lp, 'pool': pool, 'args_tmp': args_tmp} for idx, ll in enumerate(ll)]

        if self.multiprocessing: 
            pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
            
            for _ in tqdm.tqdm(pool.imap_unordered(generate_js_, map_args), total=len(map_args)):
                pass
            pool.close()
            pool.join()
            pbar.close()
        else:
            for map_arg in tqdm.tqdm(map_args):
                generate_js_(map_arg)

    def __generate(self, ll, lp, pool = {}, args_tmp = []):
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function()
        
        self.llp = self.get_llp(ll, lp)
        """originally the values is taken from lp"""
        self.rr = self.rr
        
        """REMOVED PARALLEL CALL HERE"""
        result = [wrapper_(arg) for arg in args_tmp]
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        result = self.dummy_function_2(result)
    
    """original function is different"""
    def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
        if rr == 1 and self.inshape[0] - 1 - jj < ii: 
            return 0
        elif rr == -1 and ii > jj: 
            return 0
        elif rr == 0:
            """do nothing"""
        
        ll1 = []
        ll2 = []
        
        """In the original code, there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function_3()
        
        for kk, ll in enumerate(llp):
            ll1.append(
               self.__get_pp(nu1) * 
               nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
            )
            ll2.append(
               self.__get_pp(nu2) * 
               nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
            )
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result
    
    
    
g = tmp(False, 256, 300)
g.generate()

Спасибо, Ян, снова.

...