Предостережение: я не совсем знаком с RxJava, поэтому я могу неправильно понять, какая семантика вам нужна.
Поскольку поток не завершен, вы не можете реально объединить его вывод с источником встиль fooSource.operation(barFlow)
, так как это не даст вам разумного графика этапов - сначала нужно предоставить barSource
и объединить barFlow
с этим.
Вы можете составить егонаоборот, хотя, например, так:
val barFlow: Flow[Bar, Bar, NotUsed] = ???
val fooSource: Source[Foo, NotUsed] = ???
val fooToBarFooPairsFlow: Flow[Bar, (Bar, Foo), NotUsed] =
barFlow.zip(fooSource)
, который будет связывать пары значений foo и bar, что позволит вам позже запустить его с barSource.via(fooToBarFooPairsFlow).map { case (bar, foo) => op(bar, foo) }