Я читаю данные из потокового источника MQTT с помощью Spark Structured Streaming API.
val lines:= spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "passwork")
.option("clientId", "employee11")
.load("tcp://localhost:8000").as[(String, Timestamp)]
Я преобразовываю потоковые данные в класс дел Employee
case class Employee(Name: String, Department: String)
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[Employee]
}
....some transformations
df.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
Теперь в некоторых сообщенияхочередь, структура которой отличается от класса Employee
.Допустим, некоторые обязательные столбцы отсутствуют.Мое потоковое задание не выполнено, исключение для поля не найдено.
Теперь я хочу обработать такое исключение, а также отправить уведомление об этом.Я попытался поставить блок try / catch.
case class ErrorMessage(row: String)
catch {
case e: Exception =>
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[ErrorMessage]
}
val error = lines.foreach(row => {
sendErrorMail(row._1)
})
}
}
Получил исключение, что Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt
Любая помощь по этому вопросу будет оценена.