Луиджи - как создать зависимость не между файлами, а между задачами? (или как не задействовать метод вывода) - PullRequest
2 голосов
/ 15 мая 2019

Учитывая две задачи luigi, как я могу добавить одну в качестве требования для другой таким образом, чтобы, если требуемая задача была выполнена, вторая задача могла начаться без вывода результатов?

В настоящее время я получаю RuntimeError: Неисполненная зависимость во время выполнения: MyTask ___ home _... , даже если задача выполнена нормально, потому что мои requires / output методы не настроены правильно ...

class ShellTask(ExternalProgramTask):
    """
    ExternalProgramTask's subclass dedicated for one task with the capture output ability.

    Args:
        shell_cmd (str): The shell command to be run in a subprocess.
        capture_output (bool, optional): If True the output is not displayed to console,
                                         and printed after the task is done via 
                                         logger.info (both stdout + stderr).
                                         Defaults to True.
    """
    shell_cmd = luigi.Parameter()
    requirement = luigi.Parameter(default='')
    succeeded = False

    def on_success(self):
        self.succeeded = True

    def requires(self):
        return eval(self.requirement) if self.requirement else None

    def program_args(self):
        """
        Must be implemented in an ExternalProgramTask subclass.
        Returns:
            A script that would be run in a subprocess.Popen.
        Args:
            shell_cmd (luigi.Parameter (str)): the shell command to be passed as args
                                               to the run method (run should not be overridden!).
        """
        return self.shell_cmd.split()


class MyTask(ShellTask):
    """
    Args:    if __name__ == '__main__':
    clean_output_files(['_.txt'])
    task = MyTask(
            shell_cmd='...',
            requirement="MyTask(shell_cmd='...', output_file='_.txt')",
            )
    """
    pass

if __name__ == '__main__':
    task_0 = MyTask(
            shell_cmd='...',
            requirement="MyTask(shell_cmd='...')",
            )
    luigi.build([task_0], workers=2, local_scheduler=False)

Я надеялся, что с помощью on_success можно что-то подсказать для задачи вызывающей стороны, но я не понял, как это сделать.

В настоящее время я преодолеваю это следующим образом:

0) implement the output method based on the input of the task (much like the eval(requirement) I did
2) implement the run method (calling the super run and then writing "ok" to output
3) deleting the output files from main.
4) calling it somehitng like this:

if __name__ == '__main__':
    clean_output_files(['_.txt'])
    task = MyTask(
            shell_cmd='...',
            requirement="MyTask(shell_cmd='...', output_file='_.txt')",
            )

1 Ответ

0 голосов
/ 15 мая 2019

Итак, в вашем первом задании luigi вы можете вызвать второе задание, сделав его обязательным.

Например:

class TaskB(luigi.Task):
  def __init__(self, *args, **kwargs):
      super().__init__(*args, **kwargs)
      self.complete_flag = False
  def run(self):
      self.complete_flag = True
      print('do something')

   def complete(self):
      return self.is_complete

class TaskA(luigi.Task):
   def requires(self):
      return TaskB()

   def run(self):
      print('Carry on with other logic')
...