Как запустить запланированные функции, когда система akka actor завершается - PullRequest
1 голос
/ 07 мая 2019

У меня есть система акторов, которую я использую, чтобы запланировать выполнение функции, которая производит событие в теме Кафки из оператора карты Флинка.В случае исключений система актера завершается и указывается в документации akka (см. https://doc.akka.io/docs/akka/current/scheduler.html#from-akka-actor-actorsystem). Все запланированные задачи должны быть выполнены. В моем случае, когда функция выполняется, java.lang.NoClassDefFoundError относится к классу.используется внутри функции брошен.

new RichMapFunction[String, String] {
      implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
      var myActorSystem: ActorSystem = _
      var kafkaProducer: KafkaProducer[String, String] = _
      var runtimeContext: RuntimeContext = _

      override def map(value: String): String = {
        value match {
          case "stop" =>
            throw new Exception("Stop command received")
          case _ =>
            myActorSystem.scheduler.scheduleOnce(FiniteDuration(5L, MINUTES)){
              kafkaProducer.send(new ProducerRecord[String, String]("test", value.reverse))
            }
        }

        s"scheduled function on event $value"
      }

      override def open(parameters: Configuration): Unit = {
        myActorSystem = ActorSystem("testSystem")
        kafkaProducer = {
          val props = new Properties()
          props.put("bootstrap.servers", "localhost:9092")
          // props.put("acks", "all")
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          new KafkaProducer[String, String](props)
        }
        runtimeContext = getRuntimeContext
      }

      override def close(): Unit = {
        println("Terminate actor system...")
        myActorSystem.terminate()
      }
    }

1 Ответ

0 голосов
/ 07 мая 2019

Завершение системы актера асинхронно, поэтому я использовал приведенный ниже код.

Await.result(myActorSystem.terminate(), scala.concurrent.duration.Duration.Inf)
...