Отвечаю на это сам, так как это было решено полезными ребятами на Discussion.lightbend.com, см. https://discuss.lightbend.com/t/graphstage-with-shape-of-2-in-and-2-out/4160/3
Ответ на этот вопрос заключается в простом использовании BidiShape
. Несмотря на иное разоблачающее имя, логика BidiShape
ни в коем случае не должна быть двунаправленной (это очевидно в ретроспективе, но я был сбит с толку этим).
Некоторый код, который можно использовать для справки, если кто-то находится в аналогичной ситуации, когда он должен что-то делать на основе двух входов с возможностью нажатия на два выхода:
class BiNoneCounter[T]() extends GraphStage[BidiShape[Option[T], Option[Int], Option[T], Option[Int]]] {
private val leftIn = Inlet[Option[T]]("BiNoneCounter.in1")
private val rightIn = Inlet[Option[T]]("BiNoneCounter.in2")
private val leftOut = Outlet[Option[Int]]("BiNoneCounter.out1")
private val rightOut = Outlet[Option[Int]]("BiNoneCounter.out2")
override val shape = BidiShape(leftIn, leftOut, rightIn, rightOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var grabNextPush = false
val inHandler = new InHandler {
override def onPush(): Unit = {
if (grabNextPush) {
(grab(leftIn), grab(rightIn)) match {
// do stuff here
}
}
grabNextPush = !grabNextPush
}
}
val outHandler = (inlet: Inlet[Option[T]]) => new OutHandler {
override def onPull(): Unit = {
pull(inlet)
}
}
setHandler(leftOut, outHandler(leftIn))
setHandler(rightOut, outHandler(rightIn))
setHandler(leftIn, inHandler)
setHandler(rightIn, inHandler)
}
}
Можно использовать так:
sourceOne ~> bidi.in1
bidi.out1 ~> sinkOne
sourceTwo ~> bidi.in2
bidi.out2 ~> sinkTwo