Необходимо проверить, являются ли сообщения о событиях, отправленные на Kafka
, действительными, проверив, есть ли в сообщениях необходимые поля, и если да, передайте данные на Elasticsearch
.Вот как я это сделал:
object App {
val parseJsonStream = (inStream: RDD[String]) => {
inStream.flatMap(json => {
try {
val parsed = parse(json)
Option(parsed)
} catch {
case e: Exception => System.err.println("Exception while parsing JSON: " + json)
e.printStackTrace()
None
}
}).flatMap(v => {
if (v.values.isInstanceOf[List[Map[String, Map[String, Any]]]])
v.values.asInstanceOf[List[Map[String, Map[String, Any]]]]
else if (v.values.isInstanceOf[Map[String, Map[String, Any]]])
List(v.values.asInstanceOf[Map[String, Map[String, Any]]])
else {
System.err.println("EVENT WRONG FORMAT: " + v.values)
List()
}
}).flatMap(mapa => {
val h = mapa.get("header")
val b = mapa.get("body")
if (h.toSeq.toString.contains("session.end") && !b.toSeq.toString.contains("duration")) {
System.err.println("session.end HAS NO DURATION FIELD!")
None
}
else if (h.isEmpty || h.get.get("userID").isEmpty || h.get.get("timestamp").isEmpty) {
throw new Exception("FIELD IS MISSING")
None
}
else {
Some(mapa)
}
})
}
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc, PreferBrokers, Subscribe[String, String](KAFKA_EVENT_TOPICS, kafkaParams)
)
val kafkaStreamParsed = kafkaStream.transform(rdd => {
val eventJSON = rdd.map(_.value)
parseJsonStream(eventJSON)
}
)
val esEventsStream = kafkaStreamParsed.map(addElasticMetadata(_))
try {
EsSparkStreaming.saveToEs(esEventsStream, ELASTICSEARCH_EVENTS_INDEX + "_{postfix}" + "/" + ELASTICSEARCH_TYPE, Map("es.mapping.id" -> "docid")
)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(esEventsStream, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}
}
Я предполагаю, что кто-то отправляет недопустимые события (вот почему я все равно делаю эту проверку), но Spark job
не пропускает сообщение, оно завершается с ошибкойсообщение:
Исключительная ситуация класса пользователя: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 2 на этапе 6.0 4 раза, последний сбой: сбой задачи 2.3 на этапе 6.0(TID 190, xxx.xxx.host.xx, исполнитель 3): java.lang.Exception: ПОЛЕ ПРОПУСТИТСЯ
Как я могу предотвратить его сбой и просто вместо этого пропустить сообщение?Это YARN
приложение, использующее:
Spark 2.3.1
Spark-streaming-kafka-0-10_2.11:2.3.1
Scala 2.11.8