Привет. Мне интересно, может ли кто-нибудь помочь мне разобраться с обработкой исключений из GraphStage, созданной третьими лицами.
Я использую GraphStage, который генерирует исключение и останавливает поток.Что я хотел бы сделать, это записать всю информацию об ошибке (все, что было передано в GraphStage до сбоя) и продолжить обработку.Я попытался восстановить и стратегию наблюдения , но они не позволяют потоку продолжаться.
Вот пример GraphStage, который демонстрирует мою проблему.Я выбрасываю Исключение явно, это не может быть лучшей практикой.
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
class TestStage extends GraphStage[FlowShape[Int, Int]] {
private val in = Inlet[Int]("Test.in")
private val out = Outlet[Int]("Test.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = {
val num = grab(in)
if (num == 5) {
throw new Exception(s"Number is 5")
}
push(out, num)
}
override def onPull(): Unit = pull(in)
}
}
Source(1 to 10)
.via(Flow.fromGraph(new TestStage))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.recover {
case e: Exception => e.getMessage
}
.runForeach(println)
Этот пример, который не использует GraphStage, продолжает обработку.Таким образом, кажется, что исключение, выброшенное из GraphStage, должно рассматриваться по-другому?
Source(1 to 10)
.map {
case 5 => throw new Exception("5 is bad")
case n => n
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
спасибо за любую помощь