Задание Spark не выполняется при фильтрации сообщений kafka - PullRequest
3 голосов
/ 10 июня 2019

Необходимо проверить, являются ли сообщения о событиях, отправленные на Kafka, действительными, проверив, есть ли в сообщениях необходимые поля, и если да, передайте данные на Elasticsearch.Вот как я это сделал:

object App {

  val parseJsonStream = (inStream: RDD[String]) => {
    inStream.flatMap(json => {
      try {
        val parsed = parse(json)
        Option(parsed)
      } catch {
        case e: Exception => System.err.println("Exception while parsing JSON: " + json)
          e.printStackTrace()
          None
      }
    }).flatMap(v => {
      if (v.values.isInstanceOf[List[Map[String, Map[String, Any]]]])
        v.values.asInstanceOf[List[Map[String, Map[String, Any]]]]
      else if (v.values.isInstanceOf[Map[String, Map[String, Any]]])
        List(v.values.asInstanceOf[Map[String, Map[String, Any]]])
      else {
        System.err.println("EVENT WRONG FORMAT: " + v.values)
        List()
      }
    }).flatMap(mapa => {
      val h = mapa.get("header")
      val b = mapa.get("body")
      if (h.toSeq.toString.contains("session.end") && !b.toSeq.toString.contains("duration")) {
        System.err.println("session.end HAS NO DURATION FIELD!")
        None
      }
      else if (h.isEmpty || h.get.get("userID").isEmpty || h.get.get("timestamp").isEmpty) {
        throw new Exception("FIELD IS MISSING")
        None
      }
      else {
        Some(mapa)
      }
    })
  }

  val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc, PreferBrokers, Subscribe[String, String](KAFKA_EVENT_TOPICS, kafkaParams)
  )
  val kafkaStreamParsed = kafkaStream.transform(rdd => {
    val eventJSON = rdd.map(_.value)
    parseJsonStream(eventJSON)
  }
  )

  val esEventsStream = kafkaStreamParsed.map(addElasticMetadata(_))

  try {
    EsSparkStreaming.saveToEs(esEventsStream, ELASTICSEARCH_EVENTS_INDEX + "_{postfix}" + "/" + ELASTICSEARCH_TYPE, Map("es.mapping.id" -> "docid")
    )
  } catch {
    case e: Exception =>
      EsSparkStreaming.saveToEs(esEventsStream, ELASTICSEARCH_FAILED_EVENTS)
      e.printStackTrace()
  }
}

Я предполагаю, что кто-то отправляет недопустимые события (вот почему я все равно делаю эту проверку), но Spark job не пропускает сообщение, оно завершается с ошибкойсообщение:

Исключительная ситуация класса пользователя: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 2 на этапе 6.0 4 раза, последний сбой: сбой задачи 2.3 на этапе 6.0(TID 190, xxx.xxx.host.xx, исполнитель 3): java.lang.Exception: ПОЛЕ ПРОПУСТИТСЯ

Как я могу предотвратить его сбой и просто вместо этого пропустить сообщение?Это YARN приложение, использующее:

Spark 2.3.1
Spark-streaming-kafka-0-10_2.11:2.3.1
Scala 2.11.8

1 Ответ

3 голосов
/ 10 июня 2019

Вместо этого

throw new Exception("FIELD IS MISSING")
None

Просто сделайте это

None

При возникновении этого исключения ваша программа завершает работу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...