Как сгенерировать исключение в потоке акка? - PullRequest
1 голос
/ 22 апреля 2019

Я хотел бы выдать исключение следующим образом:

  Source.empty
      .map {
        throw new RuntimeException("Failed")
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }  

Но исключение не появляется в методе onComplete. Это печатает

Exception in thread "main" java.lang.RuntimeException: Failed
    at com.sweetsoft.App$.main(App.scala:30)
    at com.sweetsoft.App.main(App.scala) 

Как вызвать исключение, которое остановит поток и появится в конце?

1 Ответ

2 голосов
/ 22 апреля 2019

В Akka встроена обработка ошибок: Стратегии контроля Akka

val testSupervisionDecider: Supervision.Decider = {
        case ex: java.lang.RuntimeException =>
          println(s"some run time exception ${ex.getMessage}")
          Supervision.Stop
        case ex: Exception =>
          println("Exception occurred and stopping stream",
            ex)
          Supervision.Stop
      }

и вы можете использовать диспетчер наблюдения как

val list = List.range(1, 100)

  Source(list).map { item =>
    if ((item % 2) == 0) {
      throw new RuntimeException(s"$item")
    } else {
      item
    }
  }.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
    .runWith(Sink.foreach(println)).onComplete {
    case Success(_) =>
      println()
    case Failure(e) =>
      println(s"Thrown ${e.getMessage}")
  }
...