Я думаю, что вы почти там !!
На самом деле, это путь, описанный в документации . Использование log()
подхода дает вам более детальный контроль уровней ведения журнала для элементов, протекающих через поток, fini sh и сбоя потока. Хотя я не предпочитаю добавлять сообщение журнала внутри стратегия супервизора. Если вы хотите показать это конкретное исключение, то создайте пользовательское исключение, перехватите его в стратегии супервизора и позвольте Akka зарегистрировать это сообщение для вас. Вы можете включить debug-logging
в потоке Akka config , который по умолчанию off
, для дополнительного ведения журнала устранения неполадок на уровне журнала DEBUG. Кроме того, вы также можете включить ведение журнала на уровне актера (см. Эту документацию ).
Я думаю, что в производственной среде могут быть два способа регистрации ошибок:
1) исключение журнала или перезапуска на этапе восстановления. Таким образом, все исключения из восходящего потока будут перехвачены и записаны:
object AkkaStreamRecap extends App {
implicit val system = ActorSystem("AkkaStreamsRecap")
implicit val materialiser = ActorMaterializer()
import system.dispatcher
val source = Source(-5 to 5)
val sink = Sink.foreach[Int](println)
val flow = Flow[Int].map(x => 1 / x)
val runnableGraph = source.
via(flow).
recover {
case e => throw e
}.
to(sink)
runnableGraph.run()
}
Вывод:
0
0
0
0
-1
[ERROR] [03/06/2020 16:27:58.703] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka://AkkaStreamsRecap/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [Recover(<function1>)]: / by zero
java.lang.ArithmeticException: / by zero
at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:41)
at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
at akka.actor.Actor.aroundPreStart(Actor.scala:550)
at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.create(ActorCell.scala:676)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
at akka.dispatch.Mailbox.run(Mailbox.scala:228)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2) определить стратегию настраиваемого надзора и использовать ее в атрибутах потока или в настройках материализатора:
object AkkaStreamRecap extends App {
implicit val system = ActorSystem("AkkaStreamsRecap")
private val decider: Supervision.Decider = {
case e: ArithmeticException =>
println("Arithmetic exception: Divide by Zero")
Supervision.Stop
}
implicit val materialiser = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
import system.dispatcher
val source = Source(-5 to 5)
val sink = Sink.foreach[Int](println)
val flow = Flow[Int].map(x => 1 / x)
val runnableGraph = source.via(flow).log("divide by zero").to(sink)
runnableGraph.run()
}
вывод:
0
0
0
0
-1
Arithmetic exception: Divide by Zero
[ERROR] [03/06/2020 16:37:00.740] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://AkkaStreamsRecap/system/StreamSupervisor-0)] [divide by zero] Upstream failed.
java.lang.ArithmeticException: / by zero
at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:26)
at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
at akka.actor.Actor.aroundPreStart(Actor.scala:550)
at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.create(ActorCell.scala:676)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
at akka.dispatch.Mailbox.run(Mailbox.scala:228)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Дайте мне знать, если это поможет !!
PS .. Я не мог найти источник или путь в официальной документации по другим способы регистрации ошибок.