Распараллеливание функции с несколькими аргументами списков с помощью многопроцессорной обработки Python - PullRequest
0 голосов
/ 07 мая 2018

Я надеюсь, что это не дубликат, но я не смог найти полностью удовлетворительный ответ для этой конкретной проблемы.

Дана функция с несколькими аргументами списка и одним итератором, например, здесь с двумя списками

def function(list1, list2, iterable):
    i1 = 2*iterable
    i2 = 2*iterable+1
    list1[i1] *= 2
    list2[i2] += 2
    return(list1, list2)

Каждый список принимается в разных записях, поэтому операции разделяются и могут быть парализованы. Каков наилучший способ сделать это с помощью многопроцессорной обработки Python?

Один простой способ распараллеливания - использование функции map:

import multiprocessing as mp
from functools import partial

list1, list2 = [1,1,1,1,1], [2,2,2,2,2]
func = partial(function, list1, list2)
pool = mp.Pool()
pool.map(func, [0,1])

Проблема в том, что если кто-то так делает, он создает для каждого процесса копию списков (если я правильно понимаю функцию карты) и параллельно работаю в разных местах в этих копиях. В конце (после касания двух итераций [0,1]) результат pool.map будет

[([3, 1, 1, 1, 1], [2, 4, 2, 2, 2]), ([1, 1, 3, 1, 1], [2, 2, 2, 4, 2])]

но я хочу

[([3, 1, 3, 1, 1], [2, 4, 2, 4, 2])].

Как этого добиться? Стоит ли разделить список по итеративным ранее, выполнить конкретные операции параллельно, а затем объединить их снова?

Заранее спасибо и извините, если я что-то перепутал, я только начал использовать мультипроцессорную библиотеку.

РЕДАКТИРОВАТЬ: Операции над различными частями в списке могут быть парализованы без синхронизации, операции над всем списком не могут быть парализованы (без синхронизации). Поэтому решение моей конкретной проблемы состоит в том, чтобы разбить списки и функцию на операции и на части списков. После этого объединяются части списков, чтобы получить весь список обратно.

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Вот решение проблемы. Я не знаю, если это лучший способ, но он работает:

import multiprocessing as mp
from functools import partial

def operation1(lst, pos)
    return(pos, lst[pos] * 2)

def operation2(lst, pos)
    return(pos, lst[pos] + 2)

if __name__ == "__main__":
    list1, list2 = [1,1,1,1,1], [2,2,2,2,2]
    iterable = [0,1]
    index1_list = [2*i for i in iterable]
    index2_list = [2*i+1 for i in iterable]

    func1 = partial(operation1, list1)
    func2 = partial(operation2, list2)

    with mp.Pool() as pool:
        result1 = pool.map(func1, index1_list)
        result2 = pool.map(func2, index2_list)

    for result in result1:
        list1[result[0]] = result[1]

    for result in result2:
        list2[result[0]] = result[1]

    print(list1, list2)
0 голосов
/ 16 мая 2018

Вы не можете делить память между процессами (технически это возможно в системах на основе форка, если вы не меняете объекты / не влияете на счетчик ссылок, что редко случается в реальных условиях) - вы можете либо использовать общая структура (большинство из них доступны в multiprocessing.Manager()), которая будет выполнять синхронизацию / обновления для вас или передавать только данные, необходимые для обработки, а затем объединять результат.

Ваш пример достаточно прост для того, чтобы оба подхода работали без серьезных штрафов, поэтому я бы просто пошел с менеджером:

import multiprocessing
import functools

def your_function(list1, list2, iterable):
    i1 = 2 * iterable
    i2 = 2 * iterable + 1
    list1[i1] *= 2
    list2[i2] += 2

if __name__ == "__main__":  # a multi-processing guard for cross-platform use
    manager = multiprocessing.Manager()
    l1 = manager.list([1, 1, 1, 1, 1])
    l2 = manager.list([2, 2, 2, 2, 2])
    func = functools.partial(your_function, l1, l2)
    pool = multiprocessing.Pool()
    pool.map(func, [0, 1])
    print(l1, l2)  # [2, 1, 2, 1, 1] [2, 4, 2, 4, 2]

Или, если ваш вариант использования более удобен для сшивания данных после обработки:

import multiprocessing
import functools

def your_function(list1, list2, iterable):
    i1 = 2 * iterable
    i2 = 2 * iterable + 1
    return (i1, list1[i1] * 2), (i2, list2[i2] + 2)  # return the changed index and value

if __name__ == "__main__":  # a multi-processing guard for cross-platform use
    l1 = [1, 1, 1, 1, 1]
    l2 = [2, 2, 2, 2, 2]
    func = functools.partial(your_function, l1, l2)
    pool = multiprocessing.Pool()
    results = pool.map(func, [0, 1])
    for r1, r2 in results:  # stitch the results back into l1 and l2
        l1[r1[0]] = r1[1]
        l2[r2[0]] = r2[1]
    print(l1, l2)  # [2, 1, 2, 1, 1] [2, 4, 2, 4, 2]

При этом вывод не соответствует тому, что вы перечислили / ожидали, а то, что должно произойти в зависимости от вашей функции.

Кроме того, если ваш случай настолько прост, вы можете полностью отказаться от многопроцессорной обработки - дополнительные накладные расходы многопроцессорной обработки (плюс синхронизация менеджера) не стоят того, если your_function() не выполняет какую-то действительно интенсивную работу ЦП.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...