Как сохранить вывод терминала Python Luigi в файл журнала с отметкой времени в имени файла журнала - PullRequest
0 голосов
/ 29 января 2019

У меня есть простой конвейер Луиджи.

import luigi
import subprocess
import row_count_test


class TaskOne(luigi.Task):


    def requires(self):
        return None

    def run(self): 
        output = row_count_test()
        if output:
            with self.output().open('w') as open_file:
                open_file.write('{}'.format(output))

    def output(self):
        return luigi.LocalTarget('TaskOne.txt')


class TaskTwo(luigi.Task):
    def requires(self):
        return TaskOne()
    def run(self):
        subprocess.call('rm *.txt', shell = True)


if __name__ == "__main__":
    luigi.run()

Я запускаю следующий код в командной строке:

python luigi_demo.py --scheduler-host localhost TaskTwo

Я хочу сохранить вывод терминала в файл журнала.Я также хочу иметь возможность добавить метку времени к имени файла журнала.Я знаю, что есть способ сделать это с помощью команд bash.Есть ли способ сделать это с помощью Луиджи?Я посмотрел на документацию luigi.cfg, и она была не слишком полезна.Был бы очень признателен за простой пример.

1 Ответ

0 голосов
/ 01 февраля 2019

Вам просто нужно изменить следующее в вашем TaskTwo.

import datetime as dt
class TaskTwo(luigi.Task):
date= luigi.DateSecondParameter(default=dt.datetime.now())

def output(self):
   # Here you create a file  with your date in it.
   return luigi.LocalTarget('path/to/your/log/file%s.txt' % self.date)

def requires(self):
    return TaskOne()

def run(self):
    self.output().open('w') as f:
       subprocess.call('rm *.txt', shell = True,stdout=f)

Кроме того, если вы хотите удалить файл, созданный в Taskone, вы можете удалить весь код в процессе() затем просто добавьте self.input (). remove ()

class TaskTwo(luigi.Task):
date= luigi.DateSecondParameter(default=dt.datetime.now())

def output():
   return luigi.LocalTarget('path/to/your/log/file%s.txt' % self.date)

def requires(self):
    return TaskOne()

def run(self):
    # this should remove the file created in the  Task one.
    self.input().remove()
...