Учитывая две задачи luigi, как я могу добавить одну в качестве требования для другой таким образом, чтобы, если требуемая задача была выполнена, вторая задача могла начаться без вывода результатов?
В настоящее время я получаю RuntimeError: Неисполненная зависимость во время выполнения: MyTask ___ home _... , даже если задача выполнена нормально, потому что мои requires / output
методы не настроены правильно ...
class ShellTask(ExternalProgramTask):
"""
ExternalProgramTask's subclass dedicated for one task with the capture output ability.
Args:
shell_cmd (str): The shell command to be run in a subprocess.
capture_output (bool, optional): If True the output is not displayed to console,
and printed after the task is done via
logger.info (both stdout + stderr).
Defaults to True.
"""
shell_cmd = luigi.Parameter()
requirement = luigi.Parameter(default='')
succeeded = False
def on_success(self):
self.succeeded = True
def requires(self):
return eval(self.requirement) if self.requirement else None
def program_args(self):
"""
Must be implemented in an ExternalProgramTask subclass.
Returns:
A script that would be run in a subprocess.Popen.
Args:
shell_cmd (luigi.Parameter (str)): the shell command to be passed as args
to the run method (run should not be overridden!).
"""
return self.shell_cmd.split()
class MyTask(ShellTask):
"""
Args: if __name__ == '__main__':
clean_output_files(['_.txt'])
task = MyTask(
shell_cmd='...',
requirement="MyTask(shell_cmd='...', output_file='_.txt')",
)
"""
pass
if __name__ == '__main__':
task_0 = MyTask(
shell_cmd='...',
requirement="MyTask(shell_cmd='...')",
)
luigi.build([task_0], workers=2, local_scheduler=False)
Я надеялся, что с помощью on_success
можно что-то подсказать для задачи вызывающей стороны, но я не понял, как это сделать.
В настоящее время я преодолеваю это следующим образом:
0) implement the output method based on the input of the task (much like the eval(requirement) I did
2) implement the run method (calling the super run and then writing "ok" to output
3) deleting the output files from main.
4) calling it somehitng like this:
if __name__ == '__main__':
clean_output_files(['_.txt'])
task = MyTask(
shell_cmd='...',
requirement="MyTask(shell_cmd='...', output_file='_.txt')",
)