У меня есть задача Луиджи, которая выполняет некоторые нестабильные вычисления. Подумайте о процессе оптимизации, который иногда не сходится.
import luigi
MyOptimizer(luigi.Task):
input_param: luigi.Parameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
optimize_something(self.input_param, self.output().path)
def output(self):
return luigi.LocalTarget(self.output_filename)
Теперь я хотел бы создать задачу-оболочку, которая будет запускать этот оптимизатор несколько раз с различными входными параметрами и получит выходные данные первого сходящегося запуска.
Я реализую его сейчас, используя , а не , используя MyOptimizer
, потому что, если это не удастся, luigi будет думать, что также не удалось выполнить задачу-оболочку, но я в порядке с некоторыми случаями MyOptimizer
сбой.
MyWrapper(luigi.Task):
input_params_list = luigi.ListParameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
for input_param in self.input_params_list:
try:
optimize_something(self.input_param, self.output().path)
print(f"Optimizer succeeded with input {input_param}")
break
except Exception as e:
print(f"Optimizer failed with input {input_param}. Trying again...")
def output(self):
return luigi.LocalTarget(self.output_filename)
Проблема в том, что таким образом задачи не распараллеливаются. Кроме того, вы можете себе представить, что MyOptimizer
и optimize_something
являются сложными задачами, которые также участвуют в конвейере данных, обрабатываемом luigi, что создает в моем коде хаос.
Буду признателен за любые идеи и идеи по поводу как сделать эту работу в стиле Луиджи :)