Попытка извлечь GraphStageLogic для пользовательской реализации с сохранением состояния, а затем передать ее в качестве параметра в GraphStage, дает исключение - PullRequest
1 голос
/ 18 мая 2019

Ниже приведен упрощенный фрагмент кода, где реализация GraphStateLogic передается в GraphStage в качестве аргумента конструктора: -

package akka.shapes.examples.notworking

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}

//This is base graph stage, where GraphStageLogic and SinkShape are passed in constructor parameter
class BaseGraphStage[T](val shape: SinkShape[T], graphStageLogic: GraphStageLogic) extends GraphStage[ SinkShape[T] ] {

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = graphStageLogic
}

//this is a sample stateful extension of GraphStageLogic, that accepts first ten elements only
class CountLogic(sinkShape: SinkShape[Int], maxValue: Int) extends GraphStageLogic(sinkShape) {
  var counter: Long = 0

  override def preStart(): Unit = {
    pull(sinkShape.in)
  }

  setHandler(sinkShape.in, new InHandler {
    override def onPush(): Unit = {
      val e = grab(sinkShape.in)
      println("conditional sink : " + e)
      counter = counter + 1
      counter == maxValue match {
        case true => completeStage()
        case false => pull(sinkShape.in)
      }
    }
  })
}


object SampleSinkNotWorking {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem("NotWroking")
    implicit val actorMaterializer = ActorMaterializer()

    val inlet = Inlet[Int](name = "sampleInlet")
    val sinkShape = SinkShape( inlet )
    val countGraphStateLogic = new CountLogic(sinkShape, 10)

    val sinkGraphStage = new BaseGraphStage[Int](sinkShape, countGraphStateLogic)
    val sink = Sink.fromGraph( sinkGraphStage )

    val graph = GraphDSL.create() { implicit builder =>

      import GraphDSL.Implicits._

      Source(1 to 100) ~> sink

      ClosedShape
    }

    val runnableGraph = RunnableGraph.fromGraph(graph)

    runnableGraph.run()
  }
}

Выполнение кода выше дает исключение ArrayIndexOutOfBoundsException: -

Исключение впоток "основной" java.lang.ArrayIndexOutOfBoundsException: -1 в akka.stream.stage.GraphStageLogic.setHandler (GraphStage.scala: 439) в akka.shapes.examples.notworking.CountLogic. (SampleSinkNotWorking.scala: 24) вshape.examples.notworking.SampleSinkNotWorking $ .main (SampleSinkNotWorking.scala: 46) в akka.shapes.examples.notworking.SampleSinkNotWorking.main (SampleSinkNotWorking.scala)

я пробовал отладку,например, идентификатор InLet равен -1, и он не получает сброс.

enter image description here

Но почему он не получает сброс, когда GraphStateLogic передается как конструктораргумент для GraphState?

1 Ответ

0 голосов
/ 18 мая 2019

Я немного реорганизовал ваш код, и проблема исчезла, посмотрите:


class BaseGraphStage(maxValue: Int) extends GraphStage[SinkShape[Int]] {

  val inlet = Inlet[Int](name = "sampleInlet")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with StageLogging {
      var counter: Int = 0

      setHandler(inlet, new InHandler {
        override def onPush(): Unit = {
          val e = grab(inlet)
          log.info(s"$e is consumed")
          counter += 1
          if (counter == maxValue) {
            completeStage()
          } else {
            pull(inlet)
          }
        }
      })

      override def preStart(): Unit =
        pull(inlet)

      override def postStop(): Unit =
        counter = 0
    }

  override def shape: SinkShape[Int] = SinkShape(inlet)
}

object SampleSinkNotWorking {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem("NotWorking")
    implicit val actorMaterializer = ActorMaterializer()


    val sink = Sink.fromGraph(new BaseGraphStage(10))

    Source(1 to 100).runWith(sink)
  }
}

Не могу ответить полностью на ваш последний вопрос, но я думаю, что вся хитрость заключается в создании входных данных в контексте стадии графа, а не из этого, и использовании обработчиков pre и post. Надеюсь, это поможет.

...