Исключение в подпотоке Alpakka JsonReader, которое не перехватывается супервизором - PullRequest
0 голосов
/ 22 сентября 2018

У меня есть источник, который использует flatMapConcat для консолидации нового потока элементов, созданного при разборе Json.Для этого я использую JsonReader, предоставленный Alpakka (https://developer.lightbend.com/docs/alpakka/current/data-transformations/json.html).. Это хорошо работает, но я бы хотел разобраться со случаем, когда недопустимый Json представляется читателю. В этом случае я бы хотел, чтобы исключение всплылосупервизору, чтобы он мог перезапустить поток.

Вот небольшой фрагмент, демонстрирующий случай с плохим json:

val systemErrorHandler: Supervision.Decider = {
  case e: Exception =>
    println("System Error")
    Supervision.Restart
  case _ => Supervision.Stop
}

val routeErrorHandler: Supervision.Decider = { 
  case e: Exception =>
    println("Route Error")
    Supervision.Restart
}

val exceptionStrategy = ActorAttributes.supervisionStrategy(routeErrorHandler)

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(systemErrorHandler))


val json = """{"test":"one}"""

val route = Source.single("1").flatMapConcat { _ =>
  Source.single(ByteString(json))
    .via(JsonReader.select("$")).map(_.utf8String)
}.to(Sink.foreach(println)).withAttributes(exceptionStrategy)

route.run

Я бы ожидал, что JsonReader при возникновении исключения потерпит неудачу на этапе изатем всплыть исключение до routeErrorHandler. Однако при запуске этапа JsonReader произошел сбой и исключение было проглочено. Я также попытался поместить recover в поток, чтобы вызвать исключение на «более высоком» уровне (внеподпотока), но это также не обрабатывается ни супервизором routeErrorHandler, ни супервизором, помещенным в ActorMaterializer.

Это ожидаемое поведение? Если да, то как правильно обеспечить исключение в подпотокедостигает стратегии супервизора?

...