Реализация конфигурации динамического графа luigi - PullRequest
0 голосов
/ 26 июня 2018

Я новичок в luigi, столкнулся с ним при разработке конвейера для наших усилий по ML. Хотя он не подходил для моего конкретного случая использования, в нем было так много дополнительных функций, что я решил сделать его подходящим.

По сути, я искал способ сохранить пользовательский конвейер, чтобы его результаты можно было повторять и было проще развернуть, после прочтения большинства онлайн-руководств, которые я пытался реализовать в своей сериализации, используя существующую * 1003. * конфигурации и механизмов командной строки, и этого могло бы хватить для параметров задач, но он не предоставил способа сериализации подключения DAG моего конвейера, поэтому я решил использовать WrapperTask, получивший json config file, который затем создал бы всю задачу экземпляры и соедините все каналы ввода-вывода задач luigi (выполните всю сантехнику).

Настоящим прилагаю небольшую тестовую программу для проверки:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1)


class SlowNode(TaskNode):

    def process(self):
        time.sleep(2)


# This WrapperTask builds all the nodes 
class All(luigi.WrapperTask):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        num_nodes = 513

        classes = TaskNode.__subclasses__()
        self.nodes = []
        for i in reversed(range(num_nodes)):
            cls = random.choice(classes)

            dependencies = random.sample(self.nodes, (num_nodes - i) // 35)

            obj = cls(i=i)
            if dependencies:
                obj.set_required(required=dependencies)
            else:
                obj.set_required(required=None)

            # delete existing output causing a build all
            if obj.output().exists():
                obj.output().remove()  

            self.nodes.append(obj)

    def requires(self):
        return self.nodes


if __name__ == '__main__':
    luigi.run()

Таким образом, в основном, как указано в заголовке вопроса, он фокусируется на динамических зависимостях и генерирует a 513 node dependency DAG с p=1/35 connectivity probability, он также определяет класс All (как в make all) как WrapperTask, который требует всех узлов быть построенным для того, чтобы считать его выполненным (у меня есть версия, которая только соединяет его с головками подключенных компонентов DAG, но я не хотел слишком усложнять).

Существует ли более стандартный (Luigic) способ реализации этого? Особо отметим не очень приятное осложнение с методами TaskNode init и set_required, я сделал это только потому, что получение параметров в методе init каким-то образом конфликтует с тем, как luigi регистрирует параметры. Я также попробовал несколько других способов, но это был в основном самый приличный (который работал)

Если не существует стандартного способа, которым я все еще хотел бы услышать любые ваши идеи о том, как я планирую идти, прежде чем я закончу реализацию фреймворка.

1 Ответ

0 голосов
/ 02 ноября 2018

I вчера ответил на аналогичный вопрос с демонстрацией. Я основал это почти полностью на примере в документации. . В документах назначение динамических зависимостей с помощью yeild задач выглядит так, как они предпочитают.

luigi.Config и динамические зависимости могут дать вам конвейер почти бесконечной гибкости. Они также описывают манекен Task, который вызывает здесь несколько цепочек зависимостей , , который может дать вам еще больший контроль.

...