Запускать несколько сценариев Python из одного сценария и общаться между ними? - PullRequest
0 голосов
/ 22 ноября 2018

У меня есть сценарий, который я написал, в который я могу передавать аргументы, и я хочу запустить несколько одновременных итераций (возможно, 100+) с уникальными аргументами.Я планировал написать еще один скрипт на Python, который затем запускает эти подписки / процессы, однако, чтобы он был эффективным, мне нужен этот скрипт, чтобы иметь возможность отслеживать подписки на любые ошибки.

Есть ли какой-нибудь простой способ сделатьэто или библиотека, которая предлагает эту функциональность?Я долго искал и мне не повезло найти что-нибудь.Создание подпроцессов и нескольких потоков кажется достаточно простым, но я не могу найти какие-либо руководства или руководства о том, как затем взаимодействовать с этими потоками / подпроцессами.

Ответы [ 3 ]

0 голосов
/ 22 ноября 2018

Я предлагаю использовать threading.Thread или multiprocessing.Process несмотря на требования.

Простой способ связи между потоками / процессами - это использование Очередь .Многопроцессорный модуль предоставляет некоторые другие способы связи между процессами (Очередь, Событие, Менеджер, ...)

Вы можете увидеть некоторые элементарные связи в примере:

import threading
from Queue import Queue
import random
import time


class Worker(threading.Thread):
    def __init__(self, name, queue_error):
        threading.Thread.__init__(self)
        self.name = name
        self.queue_error = queue_error

    def run(self):
        time.sleep(random.randrange(1, 10))
        # Do some processing ...
        # Report errors
        self.queue_error.put((self.name, 'Error state'))


class Launcher(object):
    def __init__(self):
        self.queue_error = Queue()

    def main_loop(self):
        # Start threads
        for i in range(10):
            w = Worker(i, self.queue_error)
            w.start()
        # Check for errors
        while True:
            while not self.queue_error.empty():
                error_data = self.queue_error.get()
                print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
            time.sleep(0.1)


if __name__ == '__main__':
    l = Launcher()
    l.main_loop()
0 голосов
/ 02 декабря 2018

Как сказал кто-то другой, вам нужно использовать несколько процессов для истинного параллелизма вместо потоков, потому что ограничение GIL препятствует одновременной работе потоков.

Если вы хотите использовать стандартную многопроцессорность библиотеку (которая основана на запуске нескольких процессов), я предлагаю использовать пул рабочих .Если я правильно понял, вы хотите запустить более 100 параллельных экземпляров.Запуск более 100 процессов на одном хосте создаст слишком много накладных расходов.Вместо этого создайте пул работников P, где P, например, число ядер на вашем компьютере, и отправьте более 100 заданий в пул.Это просто сделать, и в Интернете есть много примеров .Кроме того, когда вы отправляете задания в пул, вы можете предоставить функцию обратного вызова для получения ошибок.Это может быть достаточно для ваших нужд (есть примеры здесь ).

Однако пул в многопроцессорной обработке не может распределить работу по нескольким хостам (например, кластеру машин) в прошлый раз, когда я смотрел.Поэтому, если вам нужно это сделать или если вам нужна более гибкая схема связи, например, возможность отправлять обновления контролирующему процессу во время работы рабочих, я предлагаю использовать charm4py (обратите внимание, чтоЯ разработчик charm4py, поэтому у меня есть опыт.

С помощью charm4py вы можете создавать N рабочих, которые распределяются между процессами P по времени выполнения (работает на нескольких хостах), и рабочие могут общаться сКонтроллер просто делает удаленный вызов метода.Вот небольшой пример:

from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time

WORKER_ITERATIONS = 100


class Worker(Chare):

    def __init__(self, controller):
        self.controller = controller

    @threaded
    def work(self, x, done_future):
        result = -1
        try:
            for i in range(WORKER_ITERATIONS):
                if i % 20 == 0:
                    # send status update to controller
                    self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
                if i == 5 and self.thisIndex[0] % 2 == 0:
                    # trigger NameError on even-numbered workers
                    test[3] = 3
                time.sleep(0.01)
            result = x**2
        except Exception as e:
            # send error to controller
            self.controller.collectError(self.thisIndex, e)
        # send result to controller
        self.contribute(result, Reducer.gather, done_future)


# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
    def procNum(self, index):
        return (index[0] % (charm.numPes() - 1)) + 1


class Controller(Chare):

    def __init__(self, args):
        self.startTime = time.time()
        done_future = charm.createFuture()
        # create 12 workers, which are distributed by charm4py among processes
        workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
        # start work
        for i in range(12):
            workers[i].work(i, done_future)
        print('Results are', done_future.get())  # wait for result
        exit()

    def progressUpdate(self, worker_id, current_step):
        print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
              'progress', current_step * 100 / WORKER_ITERATIONS, '%')
        # the controller can return a value here and the worker would receive it

    def collectError(self, worker_id, error):
        print(round(time.time() - self.startTime, 3), ': Got error', error,
              'from worker', worker_id)


charm.start(Controller)

В этом примере контроллер будет печатать обновления состояния и ошибки по мере их возникновения.Он напечатает окончательные результаты всех работников, когда они все будут сделаны.Результат для рабочих, которые потерпели неудачу, будет -1.

Количество процессов P дается при запуске.Среда выполнения распределяет N рабочих по доступным процессам.Это происходит, когда рабочие создаются, и в этом конкретном примере отсутствует динамическая балансировка нагрузки.

Также обратите внимание, что в модели charm4py удаленный вызов метода асинхронный и возвращает будущее, которое может заблокировать вызывающая сторона, ноблокирует только вызывающий поток (не весь процесс).

Надеюсь, это поможет.

0 голосов
/ 22 ноября 2018

Лучший способ сделать это - использовать потоки.Если вы создали сценарий, который хотите вызвать, в функцию в этом более крупном сценарии, ваша основная функция могла бы вызывать этот сценарий столько раз, сколько вы хотите, и потоки сообщали бы необходимую информацию.Вы можете прочитать немного о том, как работают потоки здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...