Выключение ActorSystem в потоке akka - PullRequest
0 голосов
/ 11 декабря 2018

У меня есть поток akka, который непрерывно потребляет данные из темы kafka.Я никогда не выключаю систему актеров, я не хочу, чтобы мое приложение закрывалось, это правильно?Как правильно обработать выключение actorySystem?

  implicit val actorSystem: ActorSystem = ActorSystem("mytest")
  implicit val materializer: ActorMaterializer =
    ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

  val actorConfig = actorSystem.settings.config.getConfig("akka.kafka.consumer")

  val consumerSettings =
    ConsumerSettings(actorConfig, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(config.getString("kafka.hosts"))
      .withGroupId("mytestgrp")


  val flow = Consumer
    .atMostOnceSource(consumerSettings, Subscriptions.topics(config.getString("kafka.topic")))
    .grouped(500)
    .map(Pipeline.process)
    .withAttributes(supervisionStrategy(decider))

  flow.runWith(Sink.ignore)

1 Ответ

0 голосов
/ 15 апреля 2019

Когда поток завершится, вы можете закрыть систему актера

flow.runWith(Sink.ignore).onComplete {
    case _ => actorSystem.shutdown
}
...