Многопроцессорная обработка нескольких больших массивов numpy в качестве разделяемой памяти - PullRequest
2 голосов
/ 14 апреля 2020

У меня есть несколько больших numpy массивов:

x1=np.zeros((4000,4000))
x2=np.zeros((4000,4000))
x3=np.zeros((4000,4000))
.
.
.
xn=np.zeros((4000,4000))

, и я хочу выполнить функцию с этими массивами параллельно. Поскольку каждый массив не зависит от других, я подумал, что мог бы использовать shared_memory, чтобы подпроцесс не обрабатывал данные.

Можно ли создать большую «разделяемую переменную», которая содержит 3 больших numpy массивы?

Внутри подпроцесса я хотел бы записать непосредственно в эти массивы (без их перемалывания).

Я думаю, что a передал бы подпроцессам idx (0,1,2 ... n) аргумент для ссылки на массивы x1, x2, x3 ... xn?

Возможно ли это? Я думаю, что один массив не проблема, но многопроцессорная обработка нескольких массивов меня немного смущает.

Спасибо.

Ответы [ 2 ]

1 голос
/ 14 апреля 2020

Вот как вы могли бы сделать это, используя массив разделяемой памяти.

import numpy as np
import ctypes
from multiprocessing.sharedctypes import RawArray
from multiprocessing.pool import Pool

def main():
    n = ...  # Number of arrays
    # Make arrays
    x1 = np.zeros((4000, 4000), dtype=np.float64)
    x2 = np.zeros((4000, 4000), dtype=np.float64)
    # ...
    xn = np.zeros((4000, 4000), dtype=np.float64)
    # Make big array of shared memory (ctype must match array type)
    array_mem = RawArray(ctypes.c_double, n * 4000 * 4000)
    arr = np.frombuffer(array_mem, dtype=np.float64).reshape(n, 4000, 4000)
    arr[0] = x1
    arr[1] = x2
    # ...
    arr[n - 1] = xn
    # Process array in a pool of processes
    with Pool(initializer=init_process, initargs=(array_mem, arr.shape)) as p:
        p.map(process_array, range(n))
    # The array has been processed
    # ...
    print(*arr[:, :2, :3], sep='\n')
    # [[0. 0. 0.]
    #  [0. 0. 0.]]
    # [[100. 100. 100.]
    #  [100. 100. 100.]]
    # [[200. 200. 200.]
    #  [200. 200. 200.]]
    # ...

# Global values for subprocesses
process_array_mem = None
process_array_shape = None

# Process initializer saves memory pointer and array shape
def init_process(array_mem, array_shape):
    global process_array_mem, process_array_shape
    process_array_mem = array_mem
    process_array_shape = array_shape

def process_array(array_idx):
    # Create array from shared memory
    arr = np.frombuffer(process_array_mem, dtype=np.float64).reshape(process_array_shape)
    # Pick array for this process
    process_array = arr[array_idx]
    # Do processing
    process_array += 100 * array_idx

if __name__ == '__main__':
    main()

В приведенном выше коде я поставил n = ..., чтобы установить количество массивов на любое значение, которое оно имеет в вашем случае, но если вы измените его на n = 3 и сохраните фрагмент как файл, вы можете запустить его и посмотреть результат. Часть инициализатора и глобальных значений может быть немного запутанной, но дело в том, что array_mem должен наследоваться подпроцессами, что означает, что я не могу передать его как другой параметр с map, и я думаю, что это самый простой способ использовать это.

0 голосов
/ 14 апреля 2020

Вы можете использовать multiprocessing.Pool для одновременной обработки каждого массива, применяя пользовательскую функцию обработки к каждому отдельному массиву в отдельности. Этого можно достичь с помощью функции map объекта пула.

map(func, iterable):

Применить func к каждому элементу в iterable , собирая результаты в возвращаемый список.

Рассмотрим пример кода,

from multiprocessing import Pool

def process_array(arr):
    # ---> TODO: Process array
    arr += 1 # ---> e.g. add the scalar to entire array

    return arr # return processed array

if __name__ == "__main__":
    x1=np.zeros((4000,4000))
    x2=np.zeros((4000,4000))
    x3=np.zeros((4000,4000))

    with Pool() as pool:
        result = pool.map(process_array, [x1, x2, x3]) #--> all the three arrays will be processed parallely.

    print(result[0])
    print(result[1])
    print(result[2])

Пример вывода:

[[1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 ...
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]]

[[1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 ...
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]]

[[1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 ...
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...