Многопроцессорное использование Python для высокой оперативной памяти даже после вызова pool.close и pool.join - PullRequest
0 голосов
/ 16 октября 2018

Я должен выполнить распараллеливание кода, который читает строку из файла параметров, выполняет некоторую распараллеленную работу и затем читает следующую строку, пока файл не закончится.Я сделал это:

def unpack(func):
    @wraps(func)
    def wrapper(arg_tuple):
        return func(*arg_tuple)
    return wrapper

@unpack
def parallel_job(seed,distributioncsv,shift):
    #for each core, create a different file, use different seeds and start
    f = open(distributioncsv,'w+')
    random.seed(seed)
    np.random.seed(seed)
    #number of simulation each core should make
    threadsim = simnum/threadnum
    for i in range (0,threadsim):
          ...do stuff

Моя основная задача такова: я читаю файл, перебираю строки и вызываю многопроцессорность.Сначала я определю некоторые константы:

if __name__ == '__main__':
    #number of simulations, and number of threads to use
    threadnum = 10

    simnum = threadnum*10

    #order in file: Network, N, lambda, gamma, k, i0, tauf, folder
    N_f, lamma_f, gamma_f,k_f, i0_f, tauf_f = np.loadtxt("parameters.txt", delimiter=',', dtype = np.float, usecols =[1,2,3,4,5,6], unpack = True)
    folder_f, networkchoice_f =   np.loadtxt("parameters.txt", delimiter=',', dtype = np.str, usecols =[7,0], unpack = True)

    for i in range(0,len(N_f)):
        #number of nodes
        N = N_f[i]
        #per node infection probability 
        lamma = lamma_f[i]
        #per node recovery probability
        gamma = gamma_f[i]
        #average network degree or number of new links per node
        k = int(k_f[i])
        #initial number of infected nodes
        i0 = int(i0_f[i])
        #tauend of simulations
        tauf = tauf_f[i]
        #folder where to save files
        folder =  os.getenv("HOME")+folder_f[i]
        #Network to simulate
        networkchoice = networkchoice_f[i]

        #where to put the sum of all the distributions
        distributioncsv = folder +"/distribution.csv"

        #where to put all the figures
        destinationofallfigures = folder+"/Figures/a(k)/"
        #file for the k - E(k) values
        akfile = folder+'/csv/E(ak).csv'
        #plot the mean epidemics from simulations (t, I)
        avgepidemics = folder+"/Figures/I(t)/average"
        #columns name
        name = ['I', 'SI', 'deltas','t', 'run']
        saveplots = folder+"/Figures/"
        #file for the mean average
        averagecsv = folder+"/csv/average"

        #different seed for each thread
        seed = [j*2759 + 37*j**2 + 4757 for j in range(threadnum)]
        #to enumerate my runs without loosing track of them   
        shift=[j*simnum for j in range(simnum)]
        #vector with the name of the files to be created
        distribution = [folder+"/csv/distribution_%d.csv" %j for j in range(threadnum)]

Вот соответствующая часть о распараллеливании

        arguments = zip(seed,distribution,shift)

        #print arguments


        #begin parallelization

        pool = multiprocessing.Pool(threadnum)

        #spawn threadnum threads and give them parallel jobs
        pool.map(parallel_job, iterable=arguments)

        pool.close()
        # close the parallelization waiting for all the thread to be done
        pool.join()
        ... do other unparallelized stuff and end the loop

Каждый раз, когда цикл заканчивается, я ожидаю уменьшения использования памяти, потому что в какой-то моментВызываются pool.close () и pool.join ().

Вместо этого случается, что цикл за циклом продолжает увеличиваться использование памяти.

Может быть, потому что моя функция parallel_job не имеет возвращаемого значения?я должен вернуть None в конце parallel_job?В настоящее время я ничего не возвращаю.

РЕДАКТИРОВАТЬ: Я сейчас измеряю, как увеличивается использование оперативной памяти.К сожалению, процесс занимает много времени.В прошлый раз, когда я запустил этот процесс, через 4 часа он израсходовал все доступные оперативной памяти и подкачки (30 ГБ) моего компьютера.

Если я запускаю непараллельную версию этой программы, каждый цикл потребляет ~ 3 ГБ ОЗУ.

...