Обработка исключений Spark Structured Streaming - PullRequest
0 голосов
/ 05 декабря 2018

Я читаю данные из потокового источника MQTT с помощью Spark Structured Streaming API.

val lines:= spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "Employee")
  .option("username", "username")
  .option("password", "passwork")
  .option("clientId", "employee11")
  .load("tcp://localhost:8000").as[(String, Timestamp)]

Я преобразовываю потоковые данные в класс дел Employee

case class Employee(Name: String, Department: String)    
val ds = lines.map {
        row =>
          implicit val format = DefaultFormats
          parse(row._1).extract[Employee]
      }
  ....some transformations
 df.writeStream
        .outputMode("append")
        .format("es")
        .option("es.resource", "spark/employee")
        .option("es.nodes", "localhost")
        .option("es.port", 9200)
        .start()
        .awaitTermination()

Теперь в некоторых сообщенияхочередь, структура которой отличается от класса Employee.Допустим, некоторые обязательные столбцы отсутствуют.Мое потоковое задание не выполнено, исключение для поля не найдено.

Теперь я хочу обработать такое исключение, а также отправить уведомление об этом.Я попытался поставить блок try / catch.

case class ErrorMessage(row: String)        
catch {

      case e: Exception =>
        val ds = lines.map {
          row =>
            implicit val format = DefaultFormats
            parse(row._1).extract[ErrorMessage]
        }
        val error = lines.foreach(row => {
          sendErrorMail(row._1)
        })
    }
  }

Получил исключение, что Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; mqtt Любая помощь по этому вопросу будет оценена.

Ответы [ 2 ]

0 голосов
/ 16 мая 2019

Я думаю, вам лучше использовать возвращаемый объект метода start(), как описано в Документация потоковой передачи Spark .Что-то вроде:

val query = df.writeStream. ... .start()
try {
   //If the query has terminated with an exception, then the exception will be thrown.
   query.awaitTermination()
catch {
   case ex: Exception => /*code to send mail*/
}

Реализация вашей собственной раковины foreach может привести к накладным расходам при частом открытии и закрытии соединений.

0 голосов
/ 05 декабря 2018

Я создал приемник foreach в блоке catch и смог обрабатывать исключения и отправлять почтовые оповещения.

catch {
    case e: Exception =>
      val foreachWriter = new ForeachWriter[Row] {
        override def open(partitionId: Timestamp, version: Timestamp): Boolean = {
          true
        }

        override def process(value: Row): Unit = {
         code for sending mail.........
        }

        override def close(errorOrNull: Throwable): Unit = {}
      }
      val df = lines.selectExpr("cast (value as string) as json")
      df.writeStream
        .foreach(foreachWriter)
        .outputMode("append")
        .start()
        .awaitTermination()
  }
...