Как распараллелить простой цикл Python? - PullRequest
178 голосов
/ 20 марта 2012

Это, вероятно, тривиальный вопрос, но как мне распараллелить следующий цикл в python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собрать" результаты.

Многократные процессы тоже подойдут - что будет проще для этого случая. В настоящее время я использую Linux, но код также должен работать на Windows и Mac.

Какой самый простой способ распараллелить этот код?

Ответы [ 11 ]

140 голосов
/ 20 марта 2012

Использование нескольких потоков в CPython не даст вам лучшей производительности для чистого Python-кода из-за глобальной блокировки интерпретатора (GIL). Я предлагаю вместо этого использовать модуль multiprocessing:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Обратите внимание, что в интерактивном переводчике это не будет работать.

Чтобы избежать обычного FUD вокруг GIL: использование потоков для этого примера в любом случае не даст никакого преимущества. Вы хотите, чтобы использовал здесь процессы, а не потоки, потому что они избегают целой кучи проблем.

47 голосов
/ 17 сентября 2015

Чтобы распараллелить простой цикл for, joblib приносит много пользы необработанному использованию многопроцессорной обработки.Не только короткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов) или захват трассировки дочернего процесса, для улучшения отчетов об ошибках.

Отказ от ответственности: IЯ являюсь первоначальным автором joblib.

37 голосов
/ 05 мая 2017

Какой самый простой способ распараллелить этот код?

Мне действительно нравится concurrent.futures для этого, доступный в Python3 начиная с версии 3.2 - и через backport для2.6 и 2.7 на PyPi .

Вы можете использовать потоки или процессы и использовать точно такой же интерфейс.

Многопроцессорная обработка

Поместите это в файл -futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

А вот вывод:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Многопоточность

Теперь измените ProcessPoolExecutor на ThreadPoolExecutor и снова запустите модуль:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Теперь вы выполнили как многопоточность, так и многопроцессорность!

Примечание о производительности и использовании обоих вместе.

Выборка слишком мала для сравнения результатов.

Однако я подозреваю, что многопоточность будет быстрее, чем многопроцессорная в целом,особенно в Windows, поскольку Windows не поддерживает разветвление, поэтому для запуска каждого нового процесса требуется время.В Linux или Mac они, вероятно, будут ближе.

Вы можете вкладывать несколько потоков в несколько процессов, но рекомендуется не использовать несколько потоков для выделения нескольких процессов.

20 голосов
/ 19 июня 2018
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

Вышеописанное прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib).

Взято из https://blog.dominodatalab.com/simple-parallelization/

8 голосов
/ 30 августа 2018

Использование Ray имеет ряд преимуществ:

  • Вы можете распараллеливать на нескольких машинах помимо нескольких ядер (с одним и тем же кодом).
  • Эффективная обработка числовых данных через общую память (и сериализацию без копирования).
  • Высокая пропускная способность при распределенном планировании.
  • Отказоустойчивость.

ВВ вашем случае вы можете запустить Ray и определить удаленную функцию

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

, а затем вызвать ее параллельно

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Чтобы запустить тот же пример на кластере, единственная строка, которая будетизменение будет вызовом ray.init ().Соответствующую документацию можно найти здесь .

Обратите внимание, что я помогаю в разработке Ray.

4 голосов
/ 20 марта 2012

почему вы не используете потоки и один мьютекс для защиты одного глобального списка?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

имейте в виду, вы будете так же быстро, как ваша самая медленная нить

3 голосов
/ 28 марта 2019

Я нашел joblib очень полезно со мной.Пожалуйста, смотрите следующий пример:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1: использовать все доступные ядра

2 голосов
/ 10 мая 2018

очень простой пример параллельной обработки:

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()
1 голос
/ 14 декабря 2018

Допустим, у нас есть асинхронная функция

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

Это должно выполняться на большом массиве. Некоторые атрибуты передаются программе, а некоторые используются из свойства элемента словаря в массиве.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))
1 голос
/ 22 октября 2015

Это может быть полезно при реализации многопроцессорных и параллельных / распределенных вычислений в Python.

Учебник YouTube по использованию пакета techila

Techila - это промежуточное программное обеспечение для распределенных вычислений, которое напрямую интегрируется с Python с помощью пакета techila. Функция персика в пакете может быть полезна для распараллеливания структур цикла. (Следующий фрагмент кода взят из форумов сообщества Techila )

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...