Если вы посмотрите на подписи zip
и zipWith
:
def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)]
def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) => Out3): Repr[Out3]
оба метода ожидают Source
.
Архивирование Flow
с другим Flow
не будет таким тривиальным, как можно подумать (например, 2-й Flow
может производить несколько элементов на элемент ввода с mapConcat
).
Можно рассмотреть возможность создания пользовательского GraphStage
, как показано в следующем тривиализированном примере:
case class DataIn(id: Int)
case class DataOut(content: String)
case class Signal(s: Int)
class ZipperFlow extends GraphStage[FlowShape[(DataIn, Signal), DataOut]] {
val in = Inlet[(DataIn, Signal)]("ZipperFlow.in")
val out = Outlet[DataOut]("ZipperFlow.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, DataOut("content-" + grab(in)._1.id))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Тестирование ZipperFlow
:
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val dataSource = Source(1 to 5).map(DataIn(_))
val signalSource = Source(1 to 5).map(Signal(_))
val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println)
dataSource.zip(signalSource).via(new ZipperFlow).runWith(sink)
// DataOut(content-1)
// DataOut(content-2)
// DataOut(content-3)
// DataOut(content-4)
// DataOut(content-5)