Я получил сломанные данные и хотел бы использовать luigi для их обработки. На самом деле у меня есть все преобразования и т.д. И теперь я имею дело с этим, как я могу передать все свои файлы в конвейер luigi.
У меня есть внешняя задача, которая возвращает имя файла с моими очищенными данными, которое вызывается в списке, созданном glob. Из каждого входного файла я получаю очищенный и преобразованный вывод
Функция - это первый шаг моего конвейера, который я запускаю, как показано ниже:
if __name__ == '__main__':
luigi.build([l.Load()], local_scheduler=False)
class ReadFile(luigi.ExternalTask):
filename = luigi.Parameter()
def output(self):
return luigi.LocalTarget(str(self.filename))
class ExtractRawFiles(luigi.Task):
scrape_date = ''
scrape_hour = ''
def output(self):
return luigi.LocalTarget(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')
def requires(self):
for input_filename in glob.glob(f'{conf.RAW_DATA_DIRECTORY}/**/*.csv', recursive=True):
yield ReadFile(input_filename)
def run(self):
for i in self.input():
df.to_csv(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')
И вот вопрос. Каков наилучший способ применить конвейер для нескольких файлов.
- передать файл в качестве аргумента для сборки luigi?
- применить чтение всех файлов в конвейере luigi