Я думаю, что это обычное дело, и я просто пытаюсь выяснить правильную структуру зависимости luigi, но вот случай:
У меня есть папка, полная CSV-файлов.TaskA создает пары из файлов csv и записывает пути парных файлов в файл .txt
TaskB должен принимать пару путей и что-то делать с двумя файлами csv, целевые пути path
TaskC должен принять выходные данные TaskA и запланировать N TaskBs, по одному разу для каждой пары в выходных данных TaskA.
Примерно код будет выглядеть так:
class TaskA(luigi.Task):
data_dir = luigi.Parameter()
def output(self):
return luigi.LocalTarget('list_of_pairs.txt')
def run(self):
files = os.listdir(self.data_dir)
pairs = MakePairs(files)
with self.output().open('w') as fout:
for x, y in pairs:
fout.write('{},{}\n'.format(x, y))
class TaskB(luigi.Task):
file1 = luigi.Parameter()
file2 = luigi.Parameter()
def run(self):
with open(self.file1, 'r') as f1, open(self.file2, 'r') as f2, self.output().open('w') as fout:
newdata = DoSomething()
fout.write(newdata)
def output(self):
return luigi.LocalTarget(os.path.join(target_dir, self.file1[:-3] + '.foo'))
class TaskC(luigi.Task):
work_dir = luigi.Parameter()
def requires(self):
return TaskA(self.work_dir)
def run(self):
task_b_list = []
with self.input().open('r') as fin:
for line in fin:
file1, file2 = line.split(',')
task_b_list.append(TaskB(file1, file2))
yield task_b_list
DoStuff()
def output(self):
return luigi.LocalTarget('output_c')
Я правильно делаю?Или есть другой способ сделать это?Причина, по которой я делаю это таким образом, заключается в том, что я хочу позволить нескольким работникам выполнять работу, выполняемую в задаче B, потому что файлы большие, поэтому taskC является своего рода планировщиком для taskB