Python многопроцессорный pool.map для нескольких аргументов - PullRequest
400 голосов
/ 26 марта 2011

В многопроцессорной библиотеке Python есть вариант pool.map, который поддерживает несколько аргументов?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()

Ответы [ 18 ]

409 голосов
/ 26 марта 2011

есть ли вариант pool.map, который поддерживает несколько аргументов?

Python 3.3 включает в себя pool.starmap() метод :

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Для более старых версий:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

выход

1 1
2 1
3 1

Обратите внимание, как здесь используются itertools.izip() и itertools.repeat().

Из-за ошибки, упомянутой @ unutbu , вы не можете использовать functools.partial() или аналогичные возможности в Python 2.6, поэтому простая функция оболочки func_star() должна быть определена явно , См. Также обходной путь , предложенный uptimebox.

263 голосов
/ 26 марта 2011

Ответ на этот вопрос зависит от версии и ситуации. Наиболее общий ответ для последних версий Python (начиная с 3.3) был впервые описан ниже J.F. Себастьян . 1 Используется метод Pool.starmap, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их данной функции:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Для более ранних версий Python вам нужно написать вспомогательную функцию для явной распаковки аргументов. Если вы хотите использовать with, вам также нужно написать оболочку, чтобы превратить Pool в менеджер контекста. (Спасибо muon за указание на это.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

В более простых случаях с фиксированным вторым аргументом вы также можете использовать partial, но только в Python 2.7 +.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Во многом это было вдохновлено его ответом, который, вероятно, следовало бы принять вместо этого. Но так как эта книга застряла наверху, лучше всего ее улучшить для будущих читателей.

123 голосов
/ 15 января 2014

думаю ниже будет лучше

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

выход

[3, 5, 7]
46 голосов
/ 11 марта 2015

Использование Python 3.3 + с pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Результат:

1 --- 4
2 --- 5
3 --- 6

Вы также можете использовать zip () больше аргументов, если хотите: zip(a,b,c,d,e)

Если вы хотите, чтобы в качестве аргумента передавалось постоянное значение, вы должны использовать import itertools, а затем, например, zip(itertools.repeat(constant), a).

22 голосов
/ 23 января 2014

Узнав об itertools в ответе JF Sebastian , я решил пойти еще дальше и написать пакет parmap, который заботится о распараллеливании, предлагая функции map и starmap для python-2.7 и python-3.2 (а также позже), которые могут принимать любое число позиционных аргументов.

Установка

pip install parmap

Как распараллелить:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Я загрузил parmap в PyPI и в репозиторий github .

Например, на вопрос можно ответить следующим образом:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
9 голосов
/ 21 января 2014

Есть форк multiprocessing, называемый pathos ( note: используйте версию на github ), который не требует starmap - функции карты отражают API Карта Python, таким образом, карта может принимать несколько аргументов. С pathos вы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застрять в блоке __main__. После небольшого обновления ожидается выпуск Pathos - в основном, переход на Python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]
8 голосов
/ 27 июня 2014

Вы можете использовать следующие две функции, чтобы избежать написания оболочки для каждой новой функции:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Используйте функцию function со списками аргументов arg_0, arg_1 и arg_2 следующим образом:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
7 голосов
/ 21 ноября 2016

Другая простая альтернатива - заключить параметры вашей функции в кортеж, а затем обернуть параметры, которые также должны быть переданы в кортежи. Это, возможно, не идеально, когда имеешь дело с большими кусками данных. Я считаю, что он будет делать копии для каждого кортежа.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Дает вывод в некотором случайном порядке:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
7 голосов
/ 23 мая 2017

Лучшее решение для python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

out []:

[3, 5, 7]

6 голосов
/ 29 мая 2016

Лучше использовать декоратор вместо написания функции-оболочки от руки.Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая написания оболочки для каждой функции.Обычно декорированная функция не является кражей, однако мы можем использовать functools, чтобы обойти ее.Больше рассуждений можно найти здесь .

Здесь приведен пример

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Тогда вы можете отобразить его с помощью сжатых аргументов

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

КонечноВы всегда можете использовать Pool.starmap в Python 3 (> = 3.3), как указано в других ответах.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...