Есть ли способ получить сопоставляемый элемент, когда генерируется исключение akka streams? - PullRequest
0 голосов
/ 17 января 2019

Я хочу иметь возможность регистрировать определенные атрибуты сопоставляемого элемента, если возникает исключение, поэтому мне было интересно, есть ли способ получить сопоставляемый элемент, когда выбрасывается исключение потоками akka?

Если у меня есть:

val decider: Supervision.Decider = { e =>
//val item = getItemThatCausedException
  logger.error("Exception in stream with itemId:"+item.id, e)
  Supervision.Resume
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

Source(List(item1,item2,item3)).map { item =>
  if (item.property < 0) {
    throw new RuntimeException("Error")
  } else {
    i
  }
}

Есть ли способ получить неисправный предмет в Supervision.Decider или после того, как карта готова?

Ответы [ 3 ]

0 голосов
/ 17 января 2019

Вы можете использовать Supervision.Decider для регистрации этих атрибутов.

object Test extends App {

  implicit val system = ActorSystem("test")

  implicit val mat = ActorMaterializer()

  val testSupervisionDecider: Supervision.Decider = {
    case ex: RuntimeException =>
      println(s"some run time exception ${ex.getMessage}")
      Supervision.Resume
    case ex: Exception =>
     //if you want to stop the stream
   Supervision.Stop
  }

  val source = Source(List("1", "2", "3")).map { item =>
    if (item == "2") {
      throw new RuntimeException(s"$item")
    } else {
      item
    }
  }

  source
    .to(Sink.foreach(println(_)))
    .withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
    .run

}

Вывод:

1
some run time exception 2
3
0 голосов
/ 18 января 2019

Это несколько запутанно, но вы можете сделать это, поместив вашу функцию отображения в поток и используя flatMapConcat, например, так:

Source(List(item1, item2, item3)).flatMapConcat { item =>
  Source(List(item))
    .map(mapF)
    .withAttributes(ActorAttributes.supervisionStrategy { e: Throwable =>
      logger.error("Exception in stream with itemId:" + item.id, e)
      Supervision.Resume
    })
}

def mapF(item: Item) =
  if (item.property < 0) {
    throw new RuntimeException("Error")
  } else {
    i
  }

Это возможно, потому что у каждой стадии потока может быть своя собственная стратегия наблюдения.

0 голосов
/ 17 января 2019

Не с Supervision.Decide, но вы могли бы достичь этого по-другому.

Проверить эту программу:

object Streams extends App{

  implicit val system = ActorSystem("test")

  implicit val mat = ActorMaterializer()

  val source = Source(List("1", "2", "3")).map { item =>
    Try {
      if (item == "2") {
        throw new RuntimeException("Error")
      } else {
        item
      }
    }
  }
  source
    .alsoTo(
      Flow[Try[String]]
        .filter(_.isFailure)
        .to(Sink.foreach(t => println("failure: " + t))))
    .to(
      Flow[Try[String]]
        .filter(_.isSuccess)
        .to(Sink.foreach(t => println("success " + t)))).run()

}

Выходы:

success Success(1)
failure: Failure(java.lang.RuntimeException: Error)
success Success(3)
...