Запустите ту же задачу с разными параметрами и динамическими зависимостями с помощью Luigi - PullRequest
0 голосов
/ 25 октября 2018

Я думаю, что это обычное дело, и я просто пытаюсь выяснить правильную структуру зависимости luigi, но вот случай:

У меня есть папка, полная CSV-файлов.TaskA создает пары из файлов csv и записывает пути парных файлов в файл .txt

TaskB должен принимать пару путей и что-то делать с двумя файлами csv, целевые пути path

TaskC должен принять выходные данные TaskA и запланировать N TaskBs, по одному разу для каждой пары в выходных данных TaskA.

Примерно код будет выглядеть так:

class TaskA(luigi.Task):
    data_dir = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('list_of_pairs.txt')

    def run(self):
        files = os.listdir(self.data_dir)
        pairs = MakePairs(files)
        with self.output().open('w') as fout:
            for x, y in pairs:
                 fout.write('{},{}\n'.format(x, y))

class TaskB(luigi.Task):
    file1 = luigi.Parameter()
    file2 = luigi.Parameter()

    def run(self):
        with open(self.file1, 'r') as f1, open(self.file2, 'r') as f2, self.output().open('w') as fout:
            newdata = DoSomething()
            fout.write(newdata)

    def output(self):
        return luigi.LocalTarget(os.path.join(target_dir, self.file1[:-3] + '.foo'))


class TaskC(luigi.Task):
    work_dir = luigi.Parameter()

    def requires(self):
        return TaskA(self.work_dir)

    def run(self):
        task_b_list = []
        with self.input().open('r') as fin:
            for line in fin:
                file1, file2 = line.split(',')
                task_b_list.append(TaskB(file1, file2))
        yield task_b_list
        DoStuff()

def output(self):
    return luigi.LocalTarget('output_c')

Я правильно делаю?Или есть другой способ сделать это?Причина, по которой я делаю это таким образом, заключается в том, что я хочу позволить нескольким работникам выполнять работу, выполняемую в задаче B, потому что файлы большие, поэтому taskC является своего рода планировщиком для taskB

...