Было бы здорово, если бы что-то подобное можно было заставить работать.Мне не понятно, как реализовать конвергенцию в данный момент, если я не использую необработанный ввод и встраиваю имя входящего фронта, который «тикает» в полезную нагрузку.
import bonobo
from bonobo.config import Configurable
from bonobo.config import use_context
@use_context
class A(Configurable):
def __call__(self, context):
context.set_output_fields(['a'])
yield {'a': 'a'}
@use_context
class B(Configurable):
def __call__(self, context, a):
context.set_output_fields(['b'])
yield {'b': 'b'}
@use_context
class F(Configurable):
def __call__(self, context):
context.set_output_fields(['f'])
yield {'f': 'f'}
@use_context
class G(Configurable):
def __call__(self, context, f):
context.set_output_fields(['g'])
yield {'g': 'g'}
@use_context
class Normalize(Configurable):
def __call__(self, context, b, f):
context.set_output_fields(['n'])
yield {'n': 'n'}
if __name__ == '__main__':
n = Normalize()
graph = bonobo.Graph()
# Here we mark _input to None, so normalize won't get the "begin" impulsion.
graph.add_chain(n, _input=None)
# Add two different chains
graph.add_chain(A(), B(), _output=n)
graph.add_chain(F(), G(), _output=n)
bonobo.run(graph)
В настоящее время этот код приведет кошибка понятная, но прискорбная:
bonobo.errors.UnrecoverableTypeError Input of <__main__.Normalize object at 0x00000261F76EDD30> does not bind to the node signature.
Args: (<NodeExecutionContext(+Normalize) in=1>,)
Input: Bag(b={'b': 'b'})
Kwargs: {}
Signature: (context, b, f)
- A in=1 out=1 [done]
- B in=1 out=1 [done]
- F in=1 out=1 [done]
- G in=1 out=1 [done]
! Normalize in=1 err=1 [defunct]