Параллелизировать вложенный цикл for в IPython - PullRequest
6 голосов
/ 20 февраля 2012

В моем коде Python есть вложенный цикл for, который выглядит примерно так:

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Do various bits of stuff
        # Eventually get a result
        results.append(result)

Я бы хотел распараллелить этот цикл на моей 4-х ядерной машине, чтобы ускорить его. Глядя на документацию по параллельному программированию IPython (http://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html#quick-and-easy-parallelism), кажется, что существует простой способ использовать map для распараллеливания итерационных операций.

Однако для этого мне нужно иметь код внутри цикла в виде функции (что легко сделать), а затем отобразить эту функцию. У меня проблема в том, что я не могу получить массив для сопоставления этой функции. itertools.product() создает итератор, с которым я не могу использовать функцию map.

Я лаю не на том дереве, пытаясь использовать карту здесь? Есть ли лучший способ сделать это? Или есть какой-то способ использовать itertools.product, а затем выполнять параллельное выполнение с функцией, сопоставленной с результатами?

Ответы [ 5 ]

10 голосов
/ 21 февраля 2012

Чтобы распараллелить каждый вызов, вам просто нужно получить список для каждого аргумента. Вы можете использовать itertools.product + zip, чтобы получить это:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

Тогда вы можете использовать карту:

amr = dview.map(f, allzeniths, allazimuths)

Чтобы немного углубиться в шаги, вот пример:

zeniths = range(1,4)
azimuths = range(6,8)

product = list(itertools.product(zeniths, azimuths))
# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]

Итак, у нас есть «список пар», но что нам действительно нужно, так это один список для каждого аргумента, то есть «пара списков». Это именно то, что немного странный синтаксис zip(*product) дает нам:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

print allzeniths
# (1, 1, 2, 2, 3, 3)
print allazimuths
# (6, 7, 6, 7, 6, 7)

Теперь мы просто отобразим нашу функцию на эти два списка, чтобы распараллелить вложенные циклы:

def f(z,a):
    return z*a

view.map(f, allzeniths, allazimuths)

И ничего особенного в том, что их всего два - этот метод должен распространяться на произвольное количество вложенных циклов.

9 голосов
/ 06 марта 2012

Я предполагаю, что вы используете IPython 0.11 или более позднюю версию.Прежде всего определите простую функцию.

def foo(azimuth, zenith):
    # Do various bits of stuff
    # Eventually get a result
    return result

, а затем используйте прекрасный параллельный пакет IPython для распараллеливания вашей проблемы.сначала запустите контроллер с 5 подключенными модулями (#CPUs + 1), запустив кластер в окне терминала (если вы установили IPython 0.11 или более позднюю версию, эта программа должна присутствовать):

ipcluster start -n 5

В вашем скрипте connectна контроллер и передать все ваши задачи.Контроллер позаботится обо всем.

from IPython.parallel import Client

c = Client()   # here is where the client establishes the connection
lv = c.load_balanced_view()   # this object represents the engines (workers)

tasks = []
for azimuth in azimuths:
    for zenith in zeniths:
        tasks.append(lv.apply(foo, azimuth, zenith))

result = [task.get() for task in tasks]  # blocks until all results are back
1 голос
/ 07 февраля 2019

Если вы хотите сохранить структуру вашего цикла, вы можете попробовать использовать Ray ( docs ), который является основой для написания параллельного и распределенного Python. Единственное требование заключается в том, что вы должны выделить работу, которую можно распараллелить, в ее собственную функцию.

Луч можно импортировать следующим образом:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

Тогда ваш скрипт будет выглядеть так:

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `work` tasks automatically.
@ray.remote
def work(azimuth, zenith):
  # Do various bits of stuff
  # Eventually get a result
  return result

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Store a future, which represents the future result of `work`.
        results.append(work.remote(azimuth, zenith))

# Block until the results are ready with `ray.get`.
results = ray.get(results)
1 голос
/ 03 февраля 2015

Если вы действительно хотите запустить свой код параллельно, используйте concurrent.futures

import itertools
import concurrent.futures

def _work_horse(azimuth, zenith):
    #DO HEAVY WORK HERE
    return result

futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for arg_set in itertools.product(zeniths, azimuths):
        futures.append(executor.submit(_work_horse, *arg_set))
executor.shutdown(wait=True)

# Will time out after one hour.
results = [future.result(3600) for future in futures]
1 голос
/ 20 февраля 2012

Я не очень знаком с IPython, но простое решение, похоже, состоит в том, чтобы распараллелить только внешний цикл.

def f(azimuth):
    results = []
    for zenith in zeniths:
        #compute result
        results.append(result)
    return results

allresults = map(f, azimuths)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...