Я новичок в asyncio и реализовал какой-то конвейер с использованием сопрограмм asyncio.Основная идея состоит в том, чтобы иметь различные конвейеры с сопрограммами, которые связаны с агрегированной информацией в полезной нагрузке, и после этого сохранять полезную нагрузку в базе данных.
class ExamplePipeline(PipelineBaseClass):
def __init__(self):
PipelineBaseClass.__init__(self)
self.stages = [get_some_info, build_message,n,..,save_to_db]
self.wrapper = self.connect(self.stages)
def connect(self, stages):
def wrapper(*args, **kwargs):
data_out = yield from stages[0](*args, **kwargs)
for stage in stages[1:]:
data_out = yield from stage(data_out)
return data_out
return wrapper
def run(self, data_in):
asyncio.get_event_loop().run_until_complete(self.wrapper(data_in))
Функции для каждой стадии - сопрограммы, предполагающие что-то вроде
@asyncio.coroutine
def get_some_info(payload):
payload = call_an_endpoint
return payload
@asyncio.coroutine
def build_message(payload):
#do some logic
return payload
У меня есть новое требование, в котором подпрограмме build_message co в некоторых случаях требуется форкать остальную часть конвейера, это то, что остальные этапы n..save_to_db должны выполняться x раз с разными полезными нагрузками.начиная с build_message.
[get_some_info, build_message -> payload 1 [n, .. , save_to_db]
payload 2 [n, .. , save_to_db]
payload x [n, .. , save_to_db]
Итак, на финише у меня есть x разных сообщений на БД.
Как мне реализовать мою новую функцию в этом контексте?