Akka RestartSink не перезапускает поток - PullRequest
0 голосов
/ 13 декабря 2018

У меня есть простой поток akka здесь, и я не могу понять, почему поток никогда не перезапускается правильно после того, как killSwitches прерывает обработку.Любые указатели будут высоко оценены

object TestMain extends App {
    implicit val actorSystem = ActorSystem("TestMain")
    implicit val materializer = ActorMaterializer()
    val sharedKillSwitch = KillSwitches.shared("fp-change-kill-switch")

    // This stream keeps restarting after aborting and stops after 10 times 
    RestartSource.withBackoff(1 second, 1 second, 0.2, 10) {
        () => Source.tick(1 second, 1 second, 200).via(sharedKillSwitch.flow)
    }.to(Sink.foreach(println((_)))).run()

   // This does not restart at all
   Source.tick(1 second, 1 second, 400)
    .via(sharedKillSwitch.flow)
    .to(RestartSink.withBackoff[Int](1 second, 1 second, 0.2, 10) {
      () => Sink.foreach(println(_))
    }).run()

  Thread.sleep(5000)
  sharedKillSwitch.abort(new Exception(""))

}

Мне нужно не перезапускать весь поток (включая источник), а только перезапустить Sink - как в случае 2 только с RestartSink

1 Ответ

0 голосов
/ 13 декабря 2018

Существует разница в RestartSink и RestartSource.

RestartSource перезапускается, когда поток или приемник, к которому он подключен, выдает ошибку или ошибки.

RestartSink проверяет только отмену приемника, которым он управляет.Я думаю, вы должны быть в состоянии интегрировать поток KillSwitch в приемник, который предполагается воссоздать при перезапуске.

Ознакомьтесь с этой документацией .

...