Выполнение задач параллельно в Python - PullRequest
22 голосов
/ 23 ноября 2011

Я использую Python 2.7, у меня есть код, который выглядит следующим образом:

task1()
task2()
task3()
dependent1()

task4()
task5()
task6()
dependent2()

dependent3()

Единственные зависимости здесь следующие: зависимый1 должен ждать задач 1-3, зависимый2 должен ждать задач 4-6, а зависимый3 должен ждать зависимостей1-2 ... Следующее было бы хорошо: выполнение всего 6 заданий сначала параллельно, затем первые два иждивенца параллельно .. затем окончательный иждивенец

Я предпочитаю, чтобы как можно больше задач выполнялось параллельно, я гуглил некоторые модули, но я надеялся избежать внешних библиотек и не уверен, как метод Queue-Thread может решить мою проблему (может быть, кто-то может порекомендовать хороший ресурс?)

Ответы [ 3 ]

32 голосов
/ 23 ноября 2011

Встроенный threading.Thread класс предлагает все, что вам нужно: start для начала нового потока и присоединение для ожидания конца потока.

import threading

def task1():
    pass
def task2():
    pass
def task3():
    pass
def task4():
    pass
def task5():
    pass
def task6():
    pass

def dep1():
    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)
    t3 = threading.Thread(target=task3)

    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()

def  dep2():
    t4 = threading.Thread(target=task4)
    t5 = threading.Thread(target=task5)

    t4.start()
    t5.start()

    t4.join()
    t5.join()

def dep3():
    d1 = threading.Thread(target=dep1)
    d2 = threading.Thread(target=dep2)

    d1.start()
    d2.start()

    d1.join()
    d2.join()

d3 = threading.Thread(target=dep3)
d3.start()
d3.join()

В качестве альтернативы для присоединения вы можете использовать Queue.join , чтобы дождаться окончания потока.

2 голосов
/ 07 февраля 2019

Если вы хотите дать возможность внешним библиотекам, вы можете элегантно выразить задачи и их зависимости с помощью Ray .Это хорошо работает на одной машине, преимущество в том, что параллелизм и зависимости легче выразить с помощью Ray, чем с многопроцессорной обработкой python, и у него нет проблемы GIL (глобальной блокировки интерпретатора), которая часто мешает эффективной работе многопоточности.Кроме того, в будущем очень легко масштабировать рабочую нагрузку на кластер.

Решение выглядит следующим образом:

import ray

ray.init()

@ray.remote
def task1():
    pass

@ray.remote
def task2():
    pass

@ray.remote
def task3():
    pass

@ray.remote
def dependent1(x1, x2, x3):
    pass

@ray.remote
def task4():
    pass

@ray.remote
def task5():
    pass

@ray.remote
def task6():
    pass

@ray.remote
def dependent2(x1, x2, x3):
    pass

@ray.remote
def dependent3(x, y):
    pass

id1 = task1.remote()
id2 = task2.remote()
id3 = task3.remote()

dependent_id1 = dependent1.remote(id1, id2, id3)

id4 = task4.remote()
id5 = task5.remote()
id6 = task6.remote()

dependent_id2 = dependent2.remote(id4, id5, id6)

dependent_id3 = dependent3.remote(dependent_id1, dependent_id2)

ray.get(dependent_id3) # This is optional, you can get the results if the tasks return an object

Вы также можете передавать фактические объекты Pythonмежду задачами, используя аргументы внутри задач и возвращая результаты (например, говоря «возвращаемое значение» вместо «прохода» выше).

Используя «pip install ray», приведенный выше код работает изна одном компьютере, и также легко распараллеливать приложения в кластере, либо в облаке, либо в вашем собственном кластере, см. https://ray.readthedocs.io/en/latest/autoscaling.html и https://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html).. Это может пригодиться, еслирабочая нагрузка растет позже.

Отказ от ответственности: я один из разработчиков Ray.

1 голос
/ 23 ноября 2011

Посмотрите на Gevent .

Пример использования:

import gevent
from gevent import socket

def destination(jobs):
    gevent.joinall(jobs, timeout=2)
    print [job.value for job in jobs]

def task1():
    return gevent.spawn(socket.gethostbyname, 'www.google.com')

def task2():
    return gevent.spawn(socket.gethostbyname, 'www.example.com')

def task3():
    return gevent.spawn(socket.gethostbyname, 'www.python.org')

jobs = []
jobs.append(task1())
jobs.append(task2())
jobs.append(task3())
destination(jobs)

Надеюсь, это то, что вы искали.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...