У меня есть простой поток akka здесь, и я не могу понять, почему поток никогда не перезапускается правильно после того, как killSwitches прерывает обработку.Любые указатели будут высоко оценены
object TestMain extends App {
implicit val actorSystem = ActorSystem("TestMain")
implicit val materializer = ActorMaterializer()
val sharedKillSwitch = KillSwitches.shared("fp-change-kill-switch")
// This stream keeps restarting after aborting and stops after 10 times
RestartSource.withBackoff(1 second, 1 second, 0.2, 10) {
() => Source.tick(1 second, 1 second, 200).via(sharedKillSwitch.flow)
}.to(Sink.foreach(println((_)))).run()
// This does not restart at all
Source.tick(1 second, 1 second, 400)
.via(sharedKillSwitch.flow)
.to(RestartSink.withBackoff[Int](1 second, 1 second, 0.2, 10) {
() => Sink.foreach(println(_))
}).run()
Thread.sleep(5000)
sharedKillSwitch.abort(new Exception(""))
}
Мне нужно не перезапускать весь поток (включая источник), а только перезапустить Sink - как в случае 2 только с RestartSink