Python: Как я могу запустить функции Python параллельно? - PullRequest
77 голосов
/ 26 августа 2011

Сначала я исследовал и не смог найти ответ на свой вопрос. Я пытаюсь запустить несколько функций параллельно в Python.

У меня есть что-то вроде этого:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Я хочу вызвать func1 и func2 и запустить их одновременно. Функции не взаимодействуют друг с другом или на одном объекте. Сейчас мне нужно дождаться завершения func1, прежде чем func2 запустится. Как мне сделать что-то вроде ниже:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Я хочу иметь возможность создавать обе директории практически одновременно, потому что каждую минуту я считаю, сколько файлов создается. Если директории там нет, это скинет мне время.

Ответы [ 5 ]

120 голосов
/ 26 августа 2011

Вы можете использовать threading или multiprocessing.

Из-за особенностей CPython , threading вряд ли достигнет истинного параллелизма. По этой причине multiprocessing, как правило, лучше.

Вот полный пример:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Механизм запуска / объединения дочерних процессов может быть легко инкапсулирован в функцию в соответствии с вашей runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
5 голосов
/ 03 февраля 2019

Это можно сделать элегантно с Ray , системой, которая позволяет вам легко распараллеливать и распространять ваш код Python.

Чтобы распараллелить ваш пример, вам нужно определить свои функции с помощью декоратора @ray.remote, а затем вызвать их с помощью .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Если вы передаете один и тот же аргумент обеим функциям, а аргумент велик, более эффективный способ сделать это - использовать ray.put(). Это позволяет избежать сериализации большого аргумента и создания двух его копий в памяти:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Если func1() и func2() возвращают результаты, вам необходимо переписать код следующим образом:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Существует ряд преимуществ использования Ray по сравнению с многопроцессорным модулем . В частности, тот же код будет работать как на одной машине, так и на кластере машин. Для получения дополнительных преимуществ Рэй см. этот пост .

3 голосов
/ 10 мая 2016

Если вы являетесь пользователем Windows и используете Python 3, то этот пост поможет вам выполнять параллельное программирование в python. Когда вы запустите обычное программирование пула многопроцессорной библиотеки, вы получите ошибку, касающуюся основной функции в вашей программе.Это связано с тем, что в Windows нет функции fork ().Следующий пост дает решение указанной проблемы.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Поскольку я использовал python 3, я немного изменил программу следующим образом:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

После этой функции приведенный выше код проблемытакже немного изменилось так:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

И я получил вывод:

[1, 8, 27, 64, 125, 216]

Я думаю, что этот пост может быть полезен для некоторых пользователей Windows.

3 голосов
/ 26 августа 2011

Невозможно гарантировать, что две функции будут выполняться синхронно друг с другом, что, по-видимому, является тем, что вы хотите сделать.

Лучшее, что вы можете сделать, это разделить функцию на несколько шагов, а затемдождитесь окончания обоих в критических точках синхронизации, используя Process.join, как упоминается в ответе @ aix.

Это лучше, чем time.sleep(10), потому что вы не можете гарантировать точное время.С явным ожиданием вы говорите, что функции должны быть выполнены, выполняя этот шаг перед переходом к следующему, вместо того, чтобы предполагать, что он будет выполнен в течение 10 мс, что не гарантируется в зависимости от того, что еще происходит на машине.

1 голос
/ 15 мая 2019

Если ваши функции в основном выполняют работу ввода-вывода (и меньше загрузки процессора) и у вас есть Python 3.2+, вы можете использовать ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Если ваши функции в основном выполняют ЦП (и меньше операций ввода-вывода) и у вас есть Python 2.6+, вы можете использовать модуль multiprocessing :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...