Какой шаблон python можно использовать для распараллеливания? - PullRequest
0 голосов
/ 20 ноября 2018

cmd - это функция, которая обрабатывает аргумент x и выводит вывод на стандартный вывод.Например, это может быть

def cmd(x):
  print(x)

Последовательная программа, вызывающая cmd(), выглядит следующим образом.

for x in array:
  cmd(x)

Чтобы ускорить программу, я бы хотел, чтобы она работала параллельно,Вывод stdout может быть не в порядке, но вывод одного x не должен прерываться выводом другого x.

Для реализации этого в python могут быть разные способы.Я выясняю что-то вроде этого.

from joblib import Parallel, delayed
Parallel(n_jobs=100)(delayed(cmd)(i) for i in range(100))

Это лучший способ реализовать это в python с точки зрения простоты кода / читаемости и эффективности?

Кроме того, приведенный выше код работает нормально наpython3.Но не на python2, я получил следующую ошибку.Это проблема, которая может вызвать ошибки?

/ Библиотека / Frameworks / Python.framework / Versions / 2.7 / lib / python2.7 / site-packages / joblib / externals / loky / backend / semlock.py:217: RuntimeWarning: семафор разбит на OSX, выпуск может увеличить его максимальное значение («увеличить его максимальное значение», RuntimeWarning)

Спасибо.

Ответы [ 3 ]

0 голосов
/ 20 ноября 2018

в стандартной библиотеке https://docs.python.org/3/library/threading.html

import threading

def cmd(x):
    lock.acquire(blocking=True)
    print(x)
    lock.release()

lock = threading.Lock()

for i in range(100):
    t = threading.Thread(target=cmd, args=(i,))
    t.start()

Использование блокировки гарантирует, что код между lock.acquire() и lock.release() выполняется только одним потоком за раз.print метод уже поточно-ориентированный в python3, поэтому вывод не будет прерван даже без блокировки.Но если у вас есть какое-либо общее состояние между потоками (объект, который они изменяют), вам нужна блокировка.

0 голосов
/ 20 ноября 2018

Я хотел бы подойти к вопросу в вопросе с помощью следующего кода (при условии, что мы говорим об операциях, связанных с процессором):

import multiprocessing as mp
import random


def cmd(value):
    # some CPU heavy calculation
    for dummy in range(10 ** 8):
        random.random()
    # result
    return "result for {}".format(value)


if __name__ == '__main__':
    data = [val for val in range(10)]
    pool = mp.Pool(4)  # 4 - is the number of processes (the number of CPU cores used)
    # result is obtained after the process of all the data
    result = pool.map(cmd, data)

    print(result)

Вывод:

['result for 0', 'result for 1', 'result for 2', 'result for 3', 'result for 4', 'result for 5', 'result for 6', 'result for 7', 'result for 8', 'result for 9']

РЕДАКТИРОВАТЬ - еще одна реализация для получения результата сразу после вычисления - processes и queues вместо pool и map:

import multiprocessing
import random


def cmd(value, result_queue):
    # some CPU heavy calculation
    for dummy in range(10 ** 8):
        random.random()
    # result
    result_queue.put("result for {}".format(value))


if __name__ == '__main__':

    data = [val for val in range(10)]
    results = multiprocessing.Queue()

    LIMIT = 3  # 3 - is the number of processes (the number of CPU cores used)
    counter = 0
    for val in data:
        counter += 1
        multiprocessing.Process(
            target=cmd,
            kwargs={'value': val, 'result_queue': results}
            ).start()
        if counter >= LIMIT:
            print(results.get())
            counter -= 1
    for dummy in range(LIMIT - 1):
        print(results.get())

Вывод:

result for 0
result for 1
result for 2
result for 3
result for 4
result for 5
result for 7
result for 6
result for 8
result for 9
0 голосов
/ 20 ноября 2018

Если вы используете python3, вы можете вместо этого

использовать concurrent.futures из стандартной библиотеки *1003*

with concurrent.futures.ProcessPoolExecutor(100) as executor:
     for x in array:
         executor.submit(cmd, x)
.
...