Python: существующее решение для ветвления рабочего процесса / конвейера? - PullRequest
0 голосов
/ 09 марта 2012

В моем приложении я реализовал очень грубый рабочий процесс, состоящий из 5 различных «процессорных единиц». Код на данный момент структурирован так:

def run(self, result_first_step=None, result_second_step=None):

    config = read_workflow_config("config.ini")

    if config.first_step:
        result_first_step = run_process_1()

    if config.second_step and result_first_step is not None:
        result_second_step = run_process_2(result_first_step)
    else:
        raise Exception("Missing required data")

    if config.third_step:
        result_third_step = run_process_3(result_first_step, result_second_step)
    else:
        result_third_step = None

    collect_results(result_first_step, result_second_step, result_third_step)

и так далее. Код работает, но его трудно обслуживать и он достаточно хрупок (обработка намного сложнее, чем в этом упрощенном примере). Итак, я думал о принятии другой стратегии, то есть создания правильного рабочего процесса с:

  • Короткое замыкание: я не могу предоставить данные для процесса запуска или два разных типа данных. В последнем случае рабочий процесс замыкается и пропускает некоторую обработку
  • Общие объекты: конфигурация, подобная вещам, доступна для всех юнитов
  • Условия: в зависимости от конфигурации некоторые биты могут быть пропущены

Существует ли библиотека Python для выполнения таких рабочих процессов, или я должен свернуть свою собственную? Я пробовал pyutilib.workflow, но он не поддерживает должным образом общий объект конфигурации, за исключением передачи его всем работникам (утомительно).

Примечание: это для библиотеки / приложения командной строки, поэтому веб-решения для рабочих процессов не подходят.

Ответы [ 2 ]

0 голосов
/ 22 апреля 2012

Существует целый ряд подходов к конвейерам в Python, от полстраницы до ...
Вот основная идея: наверху поместите все определения шагов в dict;
, затем конвейер(например, «CAT») выполняет шаги C, A, T.

class Pipelinesimple:
    """p = Pipelinesimple( funcdict );  p.run( "C A T" ) = C(X) | A | T

    funcdict = dict( A = Afunc, B = Bfunc ... Z = Zfunc )
    pipeline = Pipelinesimple( funcdict )
    cat = pipeline.run( "C A T", X )  # C(X) | A | T, i.e. T( A( C(X) ))
    dog = pipeline.run( "D O G", X, **kw )  # D(X, **kw) | O(**kw) | G(**kw)
    """

def __init__( self, funcdict ):
    self.funcdict = funcdict  # funcs or functors of X

def run( self, steps, X, **commonargs ):
    """ steps "C A T" or ["C", "A", "T"]
        all funcs( X, **commonargs )
    """

    if isinstance( steps, basestring ):
        steps = steps.split()  # "C A T" -> ["C", "A", "T"]
    for step in steps:
        func = self.funcdict(step)
        # if X is None: ...
        X = func( X, **commonargs )
    return X

Далее, есть несколько способов присвоения различных параметров различным шагам.

Один из способов - это анализмногострочная строка, такая как

""" C  ca=5  cb=6 ...
    A  aa=1 ...
    T  ...
"""

Другая - взять список функций / имен функций / параметров, например,

pipeline.run( ["C", dict(ca=5, cb=6), lambda ..., "T", dict(ta=3) ])

Третий - разделить параметры "A__aa B__ba ... "путь sklearn.pipeline.Pipeline .делает.(Это предназначено для машинного обучения, но вы можете копировать части конвейера.)

У каждого из них есть довольно четкие плюсы и минусы.

Может прийти большое сообщество талантливых людей.с десятком прототипов решения проблемы [конвейеров] очень быстро.
Но сокращение дюжины до двух или трех занимает вечность.

Какой бы способ вы ни выбрали, предоставьте способ регистрации все параметры для прогона.

См. также:
FilterPype
nipype

0 голосов
/ 10 марта 2012

Вы можете превратить метод запуска в генератор;

def run(self)
  result_first_step = run_process_1()
  yield result_first_step
  result_second_step = run_process_2(result_first_step)
  yield result_second_step
  result_third_step = run_process_3(result_first_step, result_second_step)
  collect_results(result_first_step, result_second_step, result_third_step)
...