Трубопроводы с асинцио сопрограммы - PullRequest
3 голосов
/ 07 марта 2019

Я новичок в asyncio и реализовал какой-то конвейер с использованием сопрограмм asyncio.Основная идея состоит в том, чтобы иметь различные конвейеры с сопрограммами, которые связаны с агрегированной информацией в полезной нагрузке, и после этого сохранять полезную нагрузку в базе данных.

class ExamplePipeline(PipelineBaseClass):
    def __init__(self):
        PipelineBaseClass.__init__(self)
        self.stages = [get_some_info, build_message,n,..,save_to_db]
        self.wrapper = self.connect(self.stages)

    def connect(self, stages):
        def wrapper(*args, **kwargs):
            data_out = yield from stages[0](*args, **kwargs)
            for stage in stages[1:]:
                data_out = yield from stage(data_out)
            return data_out
        return wrapper

    def run(self, data_in):
      asyncio.get_event_loop().run_until_complete(self.wrapper(data_in))

Функции для каждой стадии - сопрограммы, предполагающие что-то вроде

@asyncio.coroutine
def get_some_info(payload):
      payload = call_an_endpoint
      return payload

@asyncio.coroutine
def build_message(payload):
      #do some logic
      return payload

У меня есть новое требование, в котором подпрограмме build_message co в некоторых случаях требуется форкать остальную часть конвейера, это то, что остальные этапы n..save_to_db должны выполняться x раз с разными полезными нагрузками.начиная с build_message.

[get_some_info, build_message -> payload 1 [n, .. , save_to_db]
                                 payload 2 [n, .. , save_to_db]
                                 payload x [n, .. , save_to_db]

Итак, на финише у меня есть x разных сообщений на БД.

Как мне реализовать мою новую функцию в этом контексте?

...