У меня есть система акторов, которую я использую, чтобы запланировать выполнение функции, которая производит событие в теме Кафки из оператора карты Флинка.В случае исключений система актера завершается и указывается в документации akka (см. https://doc.akka.io/docs/akka/current/scheduler.html#from-akka-actor-actorsystem). Все запланированные задачи должны быть выполнены. В моем случае, когда функция выполняется, java.lang.NoClassDefFoundError относится к классу.используется внутри функции брошен.
new RichMapFunction[String, String] {
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
var myActorSystem: ActorSystem = _
var kafkaProducer: KafkaProducer[String, String] = _
var runtimeContext: RuntimeContext = _
override def map(value: String): String = {
value match {
case "stop" =>
throw new Exception("Stop command received")
case _ =>
myActorSystem.scheduler.scheduleOnce(FiniteDuration(5L, MINUTES)){
kafkaProducer.send(new ProducerRecord[String, String]("test", value.reverse))
}
}
s"scheduled function on event $value"
}
override def open(parameters: Configuration): Unit = {
myActorSystem = ActorSystem("testSystem")
kafkaProducer = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
// props.put("acks", "all")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}
runtimeContext = getRuntimeContext
}
override def close(): Unit = {
println("Terminate actor system...")
myActorSystem.terminate()
}
}