Многопроцессорность - потребление памяти - PullRequest
0 голосов
/ 20 июня 2020
• 1000 , но по какой-то причине, несмотря на использование общей памяти, мое потребление памяти резко возрастает при использовании рабочего пула с 96 рабочими. не указал нужные параметры для карты) - все равно прояснил мою текущую проблему.

Есть идеи? Прикрепив снимок экрана htop на моем сервере, чтобы показать использование ЦП + потребление памяти.

enter image description here

For reference, I used the figtree package from here: https://github.com/ec2604/figtree (commit - 7ba197e45a5c6577fab56d469b4b1ccf02242e3d), это разветвленный репозиторий, который портирует C код уровня до python. Не думайте, что это действительно важно, вместо этого вы можете поместить туда любой код, интенсивно использующий процессор.

!!!!!! EDIT !!!!: Оглядываясь назад, пакет figtree выделяет память для результата ( 50000 9995 8) / (1024 ** 3) ГБ на процесс. Если вы умножите это на 96 процессов, это приведет к безумному потреблению памяти.

import figtree
import numpy as np
import multiprocessing
import ctypes
from multiprocessing import Pool, sharedctypes
n = 50000
m = 9995
X_base = sharedctypes.RawArray(ctypes.c_double, n* 77)
X_shared = np.frombuffer(X_base.get_obj())
X_shared = X_shared.reshape(n, 77)
X_shared[:] = np.random.normal(0, 1, (n, 77))
del X_shared

Q_base = sharedctypes.RawArray(ctypes.c_double, m** 2)
Q_shared = np.frombuffer(Q_base.get_obj())
Q_shared = Q_shared.reshape(m, m)
Q_shared[:] = np.random.normal(0, 1, (m, m))
del Q_shared


def fig_helper_efficient(slice):
    print(id(Q_shared))
    Q_shared = np.frombuffer(Q_base)
    Q_shared = Q_shared.reshape(9995, 9995)
    X_shared = np.frombuffer(X_base)
    X_shared = X_shared.reshape(n,77)
    if Q_shared.shape[0] == Q_shared.shape[1]:
        res = figtree.figtree(**{'X': X_shared[slice, :], 'Y': X_shared,
                                 'Q': Q_shared[:, slice].copy(), 'epsilon': 1e-12,
                                 'h': 15})
        print("done")
    return res

def divide_batches_equally(num_examples, num_batches):
    div_result = num_examples // num_batches
    mod_result = num_examples % num_batches
    size = np.zeros((num_batches + 1, 1)).astype(np.int32)
    size[1:] = div_result
    if mod_result > 0:
        size[1:mod_result + 1] += 1
    return np.cumsum(size)


def parallel_fig_vert_efficient():
    n_proc = 96
    size = divide_batches_equally(m, n_proc)
    parallel_list = [slice(int(size[i]), int(size[i + 1])) for i in range(n_proc)]
    with Pool(n_proc) as pool:
        res = pool.map(fig_helper_efficient, parallel_list)
    return res


if __name__ == '__main__':
    parallel_fig_vert_efficient()
...