У меня есть следующие задачи, которые я хочу реализовать для них
from abc import abstractmethod
from celery import Task
from django.core.paginator import Paginator
class PipelineTask(Task):
@abstractmethod
def run(self, *args, **kwargs):
pass
class BaseStagingTask(PipelineTask):
@abstractmethod
def stage(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
self.stage(*args, **kwargs)
class BaseLoadingTask(PipelineTask):
@abstractmethod
def load(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
self.load(*args, **kwargs)
class BaseFinalizingTask(PipelineTask):
@abstractmethod
def finalize(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
self.finalize(*args, **kwargs)
class BasePipeline:
def __init__(self, input_paginator, staging_tasks, loading_tasks, finalizing_tasks):
"""
:type staging_tasks: List[StagingTask]
:type loading_tasks: List[LoadingTask]
:type finalizing_tasks: List[LoadingTask]
:type input_paginator: django.core.paginator.Paginator
"""
self.input_paginator = input_paginator
self.staging_tasks = staging_tasks
self.loading_tasks = loading_tasks
self.finalizing_tasks = finalizing_tasks
def run(self):
pass
pipeline = BasePipeline(
input_paginator= Paginator(some_queryset),
staging_tasks=[s1,s2,s3],
loading_tasks=[l1,l2,l3],
finalizing_tasks=[f1,f2,f3]
)
pipeline.run()
Мне нужно запускать задачи этапов в последовательном порядке для каждого фрагмента данных и в то же время, чтобы запускать их параллельно по всем данным с условием, что каждая промежуточная задача запускается после завершения предыдущей. Это изображено на диаграмме ниже
введите описание изображения здесь
Мне просто нужен псевдокод для реализации указанного выше конвейера.