Применить конвейер Луиджи к нескольким файлам - PullRequest
0 голосов
/ 22 июня 2019

Я получил сломанные данные и хотел бы использовать luigi для их обработки. На самом деле у меня есть все преобразования и т.д. И теперь я имею дело с этим, как я могу передать все свои файлы в конвейер luigi. У меня есть внешняя задача, которая возвращает имя файла с моими очищенными данными, которое вызывается в списке, созданном glob. Из каждого входного файла я получаю очищенный и преобразованный вывод

Функция - это первый шаг моего конвейера, который я запускаю, как показано ниже:

if __name__ == '__main__':
    luigi.build([l.Load()], local_scheduler=False)
class ReadFile(luigi.ExternalTask):
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(str(self.filename))


class ExtractRawFiles(luigi.Task):
    scrape_date = ''
    scrape_hour = ''

    def output(self):
        return luigi.LocalTarget(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')

    def requires(self):
        for input_filename in glob.glob(f'{conf.RAW_DATA_DIRECTORY}/**/*.csv', recursive=True):
            yield ReadFile(input_filename)

    def run(self):
        for i in self.input():
            df.to_csv(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')

И вот вопрос. Каков наилучший способ применить конвейер для нескольких файлов.

  • передать файл в качестве аргумента для сборки luigi?
  • применить чтение всех файлов в конвейере luigi
...