Можно ли написать задачу-оболочку luigi, которая переносит неудачные подзадачи? - PullRequest
8 голосов
/ 04 мая 2020

У меня есть задача Луиджи, которая выполняет некоторые нестабильные вычисления. Подумайте о процессе оптимизации, который иногда не сходится.

import luigi

MyOptimizer(luigi.Task):
    input_param: luigi.Parameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        optimize_something(self.input_param, self.output().path)

    def output(self):
        return luigi.LocalTarget(self.output_filename)

Теперь я хотел бы создать задачу-оболочку, которая будет запускать этот оптимизатор несколько раз с различными входными параметрами и получит выходные данные первого сходящегося запуска.

Я реализую его сейчас, используя , а не , используя MyOptimizer, потому что, если это не удастся, luigi будет думать, что также не удалось выполнить задачу-оболочку, но я в порядке с некоторыми случаями MyOptimizer сбой.

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        for input_param in self.input_params_list:
            try:
                optimize_something(self.input_param, self.output().path)
                print(f"Optimizer succeeded with input {input_param}")
                break
            except Exception as e:
                print(f"Optimizer failed with input {input_param}. Trying again...")

    def output(self):
        return luigi.LocalTarget(self.output_filename)

Проблема в том, что таким образом задачи не распараллеливаются. Кроме того, вы можете себе представить, что MyOptimizer и optimize_something являются сложными задачами, которые также участвуют в конвейере данных, обрабатываемом luigi, что создает в моем коде хаос.

Буду признателен за любые идеи и идеи по поводу как сделать эту работу в стиле Луиджи :)

...