Как создать экспоненциальную стратегию перезапуска для определенных типов ошибок в потоке Akka? - PullRequest
0 голосов
/ 15 октября 2018

У нас есть поток, связанный с Кафкой, который должен вести себя по-разному в зависимости от типа исключения.Например, если он имеет SQLException, он должен использовать Supervision.Stop, но если это RetriableException, он должен использовать Supervision.Restart.

Я также хотел бы иметь возможность реализовать стратегию экспоненциального откатадля тех типов ошибок, которые необходимо перезапустить, но в некоторых тестах, которые я сделал, кажется, что использование RestartSource с Decider приводит к игнорированию Decider.

Каков наилучший способреализовать стратегию экспоненциального отката для потоков, которые выбрасывают определенный тип ошибки?

1 Ответ

0 голосов
/ 15 октября 2018

Чтобы решить первую проблему, вы можете просто использовать шаблон определения:

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}

implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))

Я не думаю, что RestartSource игнорирует решение ... Решение является свойством материализатора,который требуется для запуска источника.Если это так, вы должны сообщить об ошибке в Akka Streams.

...