Как применить 2D-цепочку с Celery - PullRequest
0 голосов
/ 07 августа 2020

У меня есть следующие задачи, которые я хочу реализовать для них


from abc import abstractmethod
from celery import Task
from django.core.paginator import Paginator


class PipelineTask(Task):

    @abstractmethod
    def run(self, *args, **kwargs):
        pass


class BaseStagingTask(PipelineTask):

    @abstractmethod
    def stage(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        self.stage(*args, **kwargs)


class BaseLoadingTask(PipelineTask):

    @abstractmethod
    def load(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        self.load(*args, **kwargs)


class BaseFinalizingTask(PipelineTask):

    @abstractmethod
    def finalize(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        self.finalize(*args, **kwargs)


class BasePipeline:

    def __init__(self, input_paginator, staging_tasks, loading_tasks, finalizing_tasks):
        """
        :type staging_tasks: List[StagingTask]
        :type loading_tasks: List[LoadingTask]
        :type finalizing_tasks: List[LoadingTask]
        :type input_paginator: django.core.paginator.Paginator
        """
        self.input_paginator = input_paginator
        self.staging_tasks = staging_tasks
        self.loading_tasks = loading_tasks
        self.finalizing_tasks = finalizing_tasks


    def run(self):
        pass

pipeline = BasePipeline(
    input_paginator= Paginator(some_queryset),
    staging_tasks=[s1,s2,s3],
    loading_tasks=[l1,l2,l3],
    finalizing_tasks=[f1,f2,f3]
)
pipeline.run()

Мне нужно запускать задачи этапов в последовательном порядке для каждого фрагмента данных и в то же время, чтобы запускать их параллельно по всем данным с условием, что каждая промежуточная задача запускается после завершения предыдущей. Это изображено на диаграмме ниже

введите описание изображения здесь

Мне просто нужен псевдокод для реализации указанного выше конвейера.

1 Ответ

0 голосов
/ 07 августа 2020

Оказывается, нам не нужно использовать собственный класс Task. Что касается промежуточных задач, мы можем указать для каждой задачи очередь, которая запускает одну задачу за раз.

Таким образом, окончательный рабочий процесс будет

group(
    chord(
        chains(s1, s2, s3),
        callback=chord(
            chain(l1, l2, l3),
            callback=group(f1, f2, f3)
        )
    )
)
...