как перенаправить вывод задачи Луиджи - PullRequest
0 голосов
/ 12 мая 2019

У меня есть скрипт bash, который я хочу запустить одновременно в двух разных директориях.Мне удалось запустить его как подкласс luigi.Task с workers>1 (таким образом, работая одновременно).

Мне не удалось перенаправить вывод.Я попытался использовать суперкласс ExternalProgramTask и немного поиграл с output / redirect_output, но я не нашел того места, где хотел.

Это мой код:

#!/bin/pyhton
import luigi
import os
import time

from luigi.contrib.external_program import ExternalProgramTask


class Installer(object):
    def __init__(self, repo):
        self.repo = repo

    def run(self):
        path = path=os.path.join(os.path.abspath('.'), self.repo, 'install_dir')
        cmd='install.sh'
        install_cmd = '{path}/{cmd}'.format(path=path, cmd=cmd)

        start_time = time.asctime()
        print 'running {cmd} on {path} at {time}'.format(cmd=cmd,
                                                         path=path,
                                                         time=start_time)

        os.system(install_cmd)

        print 'finished {cmd} on {path} at {time} (started on {start_time})'.format(
                cmd=cmd,
                path=path,
                time=time.asctime(),
                start_time=start_time)

# class Ex(luigi.Task):
class Ex(ExternalProgramTask):
    repo = luigi.Parameter(default='repo0')
    capture_output = True
    output_file = 'Ex_{}'.format(repo)

    @property
    def priority(self):
        dummy_value = '17'
        return dummy_value

    def run(self):
        with self.output().open('w') as f:
            f.write(Installer(self.repo).run())

    def output(self):
        return luigi.LocalTarget(self.output_file)

    def program_args():
        """Must be overriden in an ExternalProgramTask subclass"""
        pass

@luigi.Task.event_handler(luigi.Event.START)
def started_task(task):
    print 'started {}'.format(task.__dict__)

@luigi.Task.event_handler(luigi.Event.FAILURE)
def started_task(task):
    print '{} failed'.format(task.__dict__)

@luigi.Task.event_handler(luigi.Event.SUCCESS)
def started_task(task):
    print '{} ended successfully'.format(task.__dict__)


if __name__ == '__main__':
    tasks = [Ex(), Ex(repo='repo1')]
    luigi.build(tasks, workers=10, local_scheduler=False)

Что я пытаюсьсделать - это запустить задачи, которые я в данный момент выполняю, но в контролируемой среде, то есть я хочу иметь возможность полностью контролировать вывод.

Изменение задачи на Ex (capture_output = True / False) Не похожеиметь какой-либо эффект.

Подвопросы:

0) Do I have to use the ExternalProgramTask, or can this be done using luigi.Task?
1) How can I properly transfer the output_redirect variable, given I'm running from python (i.e. python my_luigi.py instead of luigi --module ...)
2) I'm using the central scheduler. Should that affect anything?
3) How can I see the output of the task in the visulaizer (localhost:8082)?
...