Применение функций-декораторов вне области создания класса декоратора? - PullRequest
4 голосов
/ 27 января 2020

Я пытаюсь изменить базу кода, найденную в этом Github репозитории . Проект нацелен на создание системы, которая использует декораторы функций для добавления функций в направленный граф acycli c, представляющий набор задач для выполнения на некотором заданном входе.

Я хотел бы изменить проект, создав класс Task, который имеет метод process, предназначенный для выполнения задач в конвейере, а не использования функций верхнего уровня, как в настоящее время функциональность .

class Task:
    def __init__(self, name):
        self.name = name

    #@Pipeline.task()
    def process(self, input):
        return input

class AdditionTask(Task):
    def __init__(self, name, value):
        super().__init__(self, name)
        self.value = value

    @Pipeline.task()
    def process(self, input):
        return map(lambda x: x + self.value, input)

Я предполагаю, что это допустимое украшение, учитывая код example3.py, указанный в этой ссылке , описывающей Python декораторов. Хотя кажется, что при попытке сделать это генерируется ошибка при предоставлении декоратора

TypeError: task () отсутствует 1 обязательный позиционный аргумент: 'self'

Насколько далеко как я могу сказать, когда создается экземпляр AdditionTask, декоратор функции находится не в той же области действия, что и экземпляр объекта Pipeline(). Как видно из примера SSCCE ниже

from collections import deque

# https://github.com/vdymna/generic-python-pipeline/blob/master/pipeline/dag.py
class DAG:
    """Directed acyclic graph structure to manage pipeline task dependecies."""

    def __init__(self):
        self.graph = {}

    def add(self, node, points_to=None):
        """Add new task not to the graph, specify optional 'points_to' parameter."""
        if node not in self.graph:
            self.graph[node] = []

        if points_to:
            if points_to not in self.graph:
                self.graph[points_to] = []
            self.graph[node].append(points_to)  # todo: need to make sure not to add duplicates

        # if sorted tasks and original graph lengths there must be a cycle
        if len(self.sort()) != len(self.graph):
            raise Exception('A cycle is detected in the graph')

    def sort(self):
        """Sort all the task nodes based on the dependencies."""
        self.in_degrees()

        nodes_to_visit = deque()
        for node, pointers in self.degrees.items():
            # find all root nodes
            if pointers == 0:
                nodes_to_visit.append(node)

        sorted_nodes = []

        while nodes_to_visit:
            node = nodes_to_visit.popleft()
            for pointer in self.graph[node]:
                self.degrees[pointer] -= 1
                if self.degrees[pointer] == 0:
                    nodes_to_visit.append(pointer)
            sorted_nodes.append(node)

        return sorted_nodes

    def in_degrees(self):
        """Determing number of in-coming edges for each task node."""
        self.degrees = {}

        for node in self.graph:
            if node not in self.degrees:
                self.degrees[node] = 0
            for pointed in self.graph[node]:
                if pointed not in self.degrees:
                    self.degrees[pointed] = 0
                self.degrees[pointed] += 1

# https://github.com/vdymna/generic-python-pipeline/blob/master/pipeline/pipeline.py
class Pipeline:
    """Create a pipeline by chaining multiple tasks and identifying dependencies."""

    def __init__(self):
        self.tasks = DAG()

    def task(self, depends_on=None):
        """Add new task to the pipeline and specify dependency task (optional)."""

        def inner(func):
            if depends_on:
                self.tasks.add(depends_on, func)
            else:
                self.tasks.add(func)
            return func

        return inner

    def run(self, *args):
        """Execute the pipeline and return each task results."""
        sorted_tasks = self.tasks.sort()
        completed = {}

        for task in sorted_tasks:
            for depend_on, nodes in self.tasks.graph.items():
                if task in nodes:
                    completed[task] = task(completed[depend_on])
            if task not in completed:
                if sorted_tasks.index(task) == 0:
                    completed[task] = task(*args)
                else:
                    completed[task] = task()

        return completed

class Task:
    def __init__(self, name):
        self.name = name

    #@Pipeline.task()
    def process(self, input):
        return input

class AdditionTask(Task):
    def __init__(self, name, value):
        super().__init__(self, name)
        self.value = value

    @Pipeline.task()
    def process(self, input):
        return map(lambda x: x + self.value, input)

if __name__ == "__main__":
    pipeline = Pipeline()
    add_op = AdditionTask(4)
    print(pipeline.run(range(0, 4)))

Есть ли способ, которым я могу декорировать функции класса с помощью декоратора функций, определенного в Pipeline, чтобы обернуть функции класса в функции конвейера?


Единственное потенциальное решение, которое у меня сейчас есть, - создать глобальный класс в Pipeline, то есть вместо определения self.tasks я бы использовал Pipeline.Tasks, таким образом добавление Task с до конвейера не зависит от объема. Но это имеет проблемы, если я хотел бы создать несколько Pipeline с и при администрировании задач конвейера и DAG.


РЕДАКТИРОВАТЬ: Хм, я думаю, что-то хочу Сродни последнему расширенному примеру декоратора в этом посте . Здесь создается декоратор, который применяется к классу. Декоратор проверяет декорированный класс на наличие атрибутов, к которым он может затем применить другой декоратор. В этом случае, декоратор, который определяет время выполнения функции. Из сообщения

def time_this(original_function):      
    print "decorating"                      
    def new_function(*args,**kwargs):
        print "starting timer"       
        import datetime                 
        before = datetime.datetime.now()                     
        x = original_function(*args,**kwargs)                
        after = datetime.datetime.now()                      
        print "Elapsed Time = {0}".format(after-before)      
        return x                                             
    return new_function  

def time_all_class_methods(Cls):
    class NewCls(object):
        def __init__(self,*args,**kwargs):
            self.oInstance = Cls(*args,**kwargs)
        def __getattribute__(self,s):
            """
            this is called whenever any attribute of a NewCls object is accessed. This function first tries to 
            get the attribute off NewCls. If it fails then it tries to fetch the attribute from self.oInstance (an
            instance of the decorated class). If it manages to fetch the attribute from self.oInstance, and 
            the attribute is an instance method then `time_this` is applied.
            """
            try:    
                x = super(NewCls,self).__getattribute__(s)
            except AttributeError:      
                pass
            else:
                return x
            x = self.oInstance.__getattribute__(s)
            if type(x) == type(self.__init__): # it is an instance method
                return time_this(x)                 # this is equivalent of just decorating the method with time_this
            else:
                return x
    return NewCls

#now lets make a dummy class to test it out on:

@time_all_class_methods
class Foo(object):
    def a(self):
        print "entering a"
        import time
        time.sleep(3)
        print "exiting a"

oF = Foo()
oF.a()

Хотя это опять-таки представляет ту же проблему, которую я идентифицировал в исходном тексте публикации, я не вижу способа передать экземпляр Pipeline декоратору, описанному time_all_class_methods , Хотя я могу украсить класс Task классом, похожим на time_all_class_methods, он все равно не будет знать о конкретном экземпляре Pipeline, в который будут добавлены декорированные атрибуты; до Pipeline.tasks.

...