Ниже приведен упрощенный фрагмент кода, где реализация GraphStateLogic передается в GraphStage в качестве аргумента конструктора: -
package akka.shapes.examples.notworking
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
//This is base graph stage, where GraphStageLogic and SinkShape are passed in constructor parameter
class BaseGraphStage[T](val shape: SinkShape[T], graphStageLogic: GraphStageLogic) extends GraphStage[ SinkShape[T] ] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = graphStageLogic
}
//this is a sample stateful extension of GraphStageLogic, that accepts first ten elements only
class CountLogic(sinkShape: SinkShape[Int], maxValue: Int) extends GraphStageLogic(sinkShape) {
var counter: Long = 0
override def preStart(): Unit = {
pull(sinkShape.in)
}
setHandler(sinkShape.in, new InHandler {
override def onPush(): Unit = {
val e = grab(sinkShape.in)
println("conditional sink : " + e)
counter = counter + 1
counter == maxValue match {
case true => completeStage()
case false => pull(sinkShape.in)
}
}
})
}
object SampleSinkNotWorking {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("NotWroking")
implicit val actorMaterializer = ActorMaterializer()
val inlet = Inlet[Int](name = "sampleInlet")
val sinkShape = SinkShape( inlet )
val countGraphStateLogic = new CountLogic(sinkShape, 10)
val sinkGraphStage = new BaseGraphStage[Int](sinkShape, countGraphStateLogic)
val sink = Sink.fromGraph( sinkGraphStage )
val graph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
Source(1 to 100) ~> sink
ClosedShape
}
val runnableGraph = RunnableGraph.fromGraph(graph)
runnableGraph.run()
}
}
Выполнение кода выше дает исключение ArrayIndexOutOfBoundsException: -
Исключение впоток "основной" java.lang.ArrayIndexOutOfBoundsException: -1 в akka.stream.stage.GraphStageLogic.setHandler (GraphStage.scala: 439) в akka.shapes.examples.notworking.CountLogic. (SampleSinkNotWorking.scala: 24) вshape.examples.notworking.SampleSinkNotWorking $ .main (SampleSinkNotWorking.scala: 46) в akka.shapes.examples.notworking.SampleSinkNotWorking.main (SampleSinkNotWorking.scala)
я пробовал отладку,например, идентификатор InLet равен -1, и он не получает сброс.
Но почему он не получает сброс, когда GraphStateLogic передается как конструктораргумент для GraphState?