Как применить пользовательскую функцию для параллельного объединения элементов массива? - PullRequest
2 голосов
/ 08 мая 2019

У меня есть массив, и я хочу сравнить каждый элемент с каждым другим элементом и построить таблицу перекрестного сравнения. Он может быть легко реализован с помощью вложенных циклов, но его время вычислений увеличивается экспоненциально с размером входного массива, поэтому я хочу реализовать подход параллельной обработки, чтобы сократить потребление времени при больших размерах.

У меня есть массив, например a = [1,2,3], и я хочу применить пользовательскую функцию, например:

def add_two_numbers(x,y):
     return x+y

Простая вложенная реализация цикла будет выглядеть так:

array = [1,2,3]
matrix = np.zeros([3,3])
for i, one_element in enumerate(array):
    for j, other_element in enumerate(array):
        matrix[i][j] = add_two_numbers(one_element, other_element)

С выводом:

>>> matrix
    1   2   3
______________
1 | 2   3   4
2 | 3   4   5
3 | 4   5   6

Какой будет хороший подход для применения параллельной обработки в python для больших массивов?
Я использовал класс процесса в многопроцессорной библиотеке Python для создания n процессов для массива n элементов, но каждый процесс открывает файл на бэкэнде, и после 1024 параллельных процессов я получаю исключение «Слишком много открытых файлов». И я должен сделать матрицу глобальной переменной, чтобы каждый процесс обновлял определенный элемент.

import multiprocessing as mp

def add_two_numbers_process(one_element, array, i):
    global matrix
    for j, other_element in enumerate(array):
        matrix[i][j] = add_two_numbers(one_element, other_element)
    return

processes = []
for i, one_element in enumerate(array):
    p = mp.Process(target=add_two_numbers_process, args=(one_element, array, i))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

Я также использовал класс Pool, но это занимает в 1000 раз больше времени, чем класс процесса, что не представляется возможным.

import multiprocessing as mp

def add_two_numbers_pool(one_element, array, i):
    row = [0 for x in range(len(array))]
    for j, other_element in enumerate(array):
        row[j] = add_two_numbers(one_element, other_element)
    return row

pool = mp.Pool(mp.cpu_count())
matrix = [pool.apply(add_two_numbers_pool, args=(one_element, array, i)) for i, one_element in enumerate(array)]
pool.close()

Я не могу придумать подход с использованием dask. Может ли распределенный dask оказаться полезным в этом случае?

1 Ответ

3 голосов
/ 08 мая 2019

в качестве демонстрации использования многопроцессорной обработки и разницы в векторизации по сравнению с нет, мы можем начать с определения / извлечения общего кода:

from multiprocessing import Pool

import numpy as np

def add_two_numbers(x,y):
     return x+y

# use a large number of values so processing takes some measurable amount of time
values = np.arange(3001)

тогда мы можем сделать вашу наивную вещь:

result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
    for j, y in enumerate(values):
        result[i,j] = add_two_numbers(x, y)

, что занимает ~ 3,5 секунды на моем ноутбуке. затем мы можем переместить это к использованию multiprocessing Pool с:

def process_row(x):
    output = np.empty_like(values)
    for i, y in enumerate(values):
        output[i] = add_two_numbers(x, y)
    return output

with Pool() as pool:
    result = np.array(pool.map(process_row, values))

, что занимает у меня около 1 секунды, затем мы можем векторизовать это в Pool с помощью:

def process_row_vec(x):
    return add_two_numbers(values, x)

with Pool() as pool:
    result = np.array(pool.map(process_row_vec, values))

, что занимает 0,25 секунды, и, наконец, мы можем использовать полностью векторизованную версию numpy:

x, y = np.meshgrid(values, values)
result = add_two_numbers(x, y)

, что занимает ~ 0,09 секунды (90 мс). Я также понял, что при работе с таким большим количеством элементов эти промежуточные массивы (x и y) занимают значительное количество времени вычислений, и векторизация по строкам происходит быстрее:

result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
    result[i,:] = add_two_numbers(x, values)

занимает 0,05 секунды (50 мс).

надеюсь, что эти примеры дадут вам некоторые идеи о том, как реализовать ваш алгоритм!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...