запустить два luigi WrapperTasks подряд - PullRequest
0 голосов
/ 04 октября 2019
import luigi as li


class TaskA(li.Task):

    def output(self):
        return li.LocalTarget('TaskA.txt')

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('DONE_A')


class TaskB(li.Task):
    required_task = li.TaskParameter()

    def output(self):
        return li.LocalTarget('TaskB.txt')

    def requires(self):
        return self.required_task

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('DONE_B')


class TaskC(li.Task):

    def output(self):
        return li.LocalTarget('TaskC.txt')

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('DONE_C')


class PipelineX(li.WrapperTask):

    def requires(self):
        task_a = TaskA()
        return TaskB(required_task=task_a)


class PipelineY(li.WrapperTask):

    def requires(self):
        return TaskC()


class AllPipelines(li.?):
    pipeline_x = li.TaskParameter(default=PipelineX())
    pipeline_y = li.TaskParameter(default=PipelineY())

    # problem: PipelineY depends on PipelineX
    # how to first run pipeline_x, wait until it finished, then
    # run pipeline_y? Afterwards AllPipelines should complete.

Hello community,

Я ищу способ запуска нескольких (в настоящее время WrapperTasks) подряд.

Я попытался разобрать свою проблему в приведенном выше примере кода и был бы очень рад, если бы кто-нибудь дал мне несколько советов о том, как с этим справиться.

Цель следующая:

  1. выполнить PipelineX
  2. , когда 1. равно completed(), выполнить PipelineY
  3. , когда все будет выполнено, завершить AllPipelines

Большое спасибо всем за любую помощь!

С уважением

Крис

1 Ответ

1 голос
/ 05 октября 2019

Во-первых, поскольку вы говорите, что PipelineY зависит от PipelineX, наиболее естественным было бы включение PipelineX в требования PipelineY:

def PipelineY(luigi.WrapperTask):
    def requires(self):
        return [PipelineY, TaskC]

Однако я держу паричто на самом деле TaskC зависит от PipelineY, поэтому вы можете поместить PipelineY в зависимости TaskC.

Если вам действительно нужен конвейер, и вышеприведенное не работаетдля вас вы можете использовать динамические зависимости luigi (https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies):

def AllPipelines(luigi.Task):
    def output(self):
        return luigi.LocalTarget("success.txt")

    def run(self):
        yield PipelineX()
        yield PipelineY()
        with self.output().open('w') as out_file:
            out_file.write("1")
...