У меня есть источник, который использует flatMapConcat
для консолидации нового потока элементов, созданного при разборе Json.Для этого я использую JsonReader, предоставленный Alpakka (https://developer.lightbend.com/docs/alpakka/current/data-transformations/json.html).. Это хорошо работает, но я бы хотел разобраться со случаем, когда недопустимый Json представляется читателю. В этом случае я бы хотел, чтобы исключение всплылосупервизору, чтобы он мог перезапустить поток.
Вот небольшой фрагмент, демонстрирующий случай с плохим json:
val systemErrorHandler: Supervision.Decider = {
case e: Exception =>
println("System Error")
Supervision.Restart
case _ => Supervision.Stop
}
val routeErrorHandler: Supervision.Decider = {
case e: Exception =>
println("Route Error")
Supervision.Restart
}
val exceptionStrategy = ActorAttributes.supervisionStrategy(routeErrorHandler)
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(systemErrorHandler))
val json = """{"test":"one}"""
val route = Source.single("1").flatMapConcat { _ =>
Source.single(ByteString(json))
.via(JsonReader.select("$")).map(_.utf8String)
}.to(Sink.foreach(println)).withAttributes(exceptionStrategy)
route.run
Я бы ожидал, что JsonReader при возникновении исключения потерпит неудачу на этапе изатем всплыть исключение до routeErrorHandler
. Однако при запуске этапа JsonReader произошел сбой и исключение было проглочено. Я также попытался поместить recover
в поток, чтобы вызвать исключение на «более высоком» уровне (внеподпотока), но это также не обрабатывается ни супервизором routeErrorHandler, ни супервизором, помещенным в ActorMaterializer.
Это ожидаемое поведение? Если да, то как правильно обеспечить исключение в подпотокедостигает стратегии супервизора?