Akka Streams обрабатывает исключение от стороннего поставщика GraphStage - PullRequest
0 голосов
/ 25 ноября 2018

Привет. Мне интересно, может ли кто-нибудь помочь мне разобраться с обработкой исключений из GraphStage, созданной третьими лицами.

Я использую GraphStage, который генерирует исключение и останавливает поток.Что я хотел бы сделать, это записать всю информацию об ошибке (все, что было передано в GraphStage до сбоя) и продолжить обработку.Я попытался восстановить и стратегию наблюдения , но они не позволяют потоку продолжаться.

Вот пример GraphStage, который демонстрирует мою проблему.Я выбрасываю Исключение явно, это не может быть лучшей практикой.

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

class TestStage extends GraphStage[FlowShape[Int, Int]] {

  private val in = Inlet[Int]("Test.in")
  private val out = Outlet[Int]("Test.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        val num = grab(in)
        if (num == 5) {
          throw new Exception(s"Number is 5")
        }
        push(out, num)
      }

      override def onPull(): Unit = pull(in)
    }
}

Source(1 to 10)
.via(Flow.fromGraph(new TestStage))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.recover {
case e: Exception => e.getMessage
}
.runForeach(println)

Этот пример, который не использует GraphStage, продолжает обработку.Таким образом, кажется, что исключение, выброшенное из GraphStage, должно рассматриваться по-другому?

  Source(1 to 10)
  .map {
    case 5 => throw new Exception("5 is bad")
    case n => n
  }
  .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
  .runForeach(println)

спасибо за любую помощь

1 Ответ

0 голосов
/ 05 декабря 2018

Похоже, что это задокументированное поведение некоторых этапов потока akka

Обработка ошибок в потоках :

Операторы, которые поддерживают стратегии наблюдения, явнозадокументировано для этого, если в документации оператора нет ничего, говорящего о том, что он придерживается стратегии надзора, это означает, что он проваливается, а не применяет надзор.

Или это проблема .

...