import luigi as li
class TaskA(li.Task):
def output(self):
return li.LocalTarget('TaskA.txt')
def run(self):
with self.output().open('w') as outfile:
outfile.write('DONE_A')
class TaskB(li.Task):
required_task = li.TaskParameter()
def output(self):
return li.LocalTarget('TaskB.txt')
def requires(self):
return self.required_task
def run(self):
with self.output().open('w') as outfile:
outfile.write('DONE_B')
class TaskC(li.Task):
def output(self):
return li.LocalTarget('TaskC.txt')
def run(self):
with self.output().open('w') as outfile:
outfile.write('DONE_C')
class PipelineX(li.WrapperTask):
def requires(self):
task_a = TaskA()
return TaskB(required_task=task_a)
class PipelineY(li.WrapperTask):
def requires(self):
return TaskC()
class AllPipelines(li.?):
pipeline_x = li.TaskParameter(default=PipelineX())
pipeline_y = li.TaskParameter(default=PipelineY())
# problem: PipelineY depends on PipelineX
# how to first run pipeline_x, wait until it finished, then
# run pipeline_y? Afterwards AllPipelines should complete.
Hello community,
Я ищу способ запуска нескольких (в настоящее время WrapperTasks) подряд.
Я попытался разобрать свою проблему в приведенном выше примере кода и был бы очень рад, если бы кто-нибудь дал мне несколько советов о том, как с этим справиться.
Цель следующая:
- выполнить
PipelineX
- , когда 1. равно
completed()
, выполнить PipelineY
- , когда все будет выполнено, завершить
AllPipelines
Большое спасибо всем за любую помощь!
С уважением
Крис