Вот один из способов использования низкоуровневого API JSON.Мы используем функциональность библиотеки json4s
, которая уже включена в Spark.Чтобы применить пользовательский контроль проверки к вашим объектам, вы можете написать два пользовательских считывателя для каждого из них, как показано ниже:
import org.json4s._
import org.json4s.jackson.JsonMethods._
case class Detail(id: Int, attr2: Int, attr3: String)
case class Input(id: Int, ts: Int, details: Seq[Detail])
implicit object DetailReader extends Reader[Detail] {
def read(value: JValue): Detail = value match {
case JObject(JField("id", JInt(id))
:: JField("attr2", JInt(attr2))
:: JField("attr3", JString(attr3))
:: Nil)
if id != null && attr2 != null && attr3 != null
=> new Detail(id.toInt, attr2.toInt, attr3)
case _ => null
}
def isValidItem(item: Detail) = item != null && item.id > 0 && item.attr2 > 0 && item.attr3 != null
}
implicit object InputReader extends Reader[Input] {
def read(value: JValue): Input = value match {
case JObject(JField("id", JInt(id))
:: JField("ts", JInt(ts))
:: JField("details", JArray(details))
:: Nil)
if id > 0 && ts > 0 && !details.isEmpty && toItemList(details).forall(DetailReader.isValidItem)
=> new Input(id.toInt, ts.toInt, toItemList(details))
case _ => null
}
private def toItemList(details: List[JValue]) = details.map(_.as[Detail])
}
val df = Seq(
("""{"id":1, "ts":1557994974, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3,"attr3":"something"}]}"""),
("""{"id":2, "ts":1557994975, "details":[{"id":1,"attr2":"3","attr3":"something"}, {"id":2,"attr2":"3","attr3":"something"}"""),
("""{"id":3,"attr2":"3","attr3":"something"}]}"""),
("""{"id":3, "ts":1557994976, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3}]}"""),
("""{"id":4, "ts":1557994977, "details":[]}"""))
.toDF("json")
val extractInputUDF = udf[Input, String]((json: String) =>
parseOpt(json) match{
case Some(jv: JValue) => jv.as[Input]
case _ => null
}
)
df.withColumn("parsed_item", extractItemUDF($"json"))
.select("parsed_item")
.show(false)
//Output
+-------------------------------------------------------+
|parsed_item |
+-------------------------------------------------------+
|[1, 1557994974, [[1, 3, something], [2, 3, something]]]|
|null |
|null |
|null |
|null |
+-------------------------------------------------------+
DetailReader
и InputReader
отвечают за преобразование необработанных данных JSON вDetail
и Input
объектов соответственно.Мы реализуем read
метод org.json4s.Reader[T]
для каждого элемента, который мы хотим проанализировать / проверить.
DetailReader
- Использовать сопоставление с образцом для сопоставления объектов класса Item на основе членов класса
id, attr2 and attr3
. - После сопоставления с пользовательским шаблономлогика проверки применяется с помощью проверки
if id != null && attr2 != null && attr3 != null
, которая гарантирует, что все атрибуты имеют действительное ненулевое значение. - Если все атрибуты соответствуют данным проверкам, создайте объект
Detail
на основе их значений, в противном случае возвратитеноль.
InputReader
- Использование сопоставления с образцом для сопоставления объектов класса Input на основе членов класса
id, ts, details
. - После образцасопоставление пользовательской логики проверки применяется посредством проверки
if id > 0 && ts > 0 && !details.isEmpty && toItemList(details).forall(DetailReader.isValidItem)
, обратите внимание, что здесь необходимо также проверить все элементы массива details
. - Если все атрибуты соответствуют данным проверкам, создайте *В противном случае объект 1040 *, основанный на их значениях, возвращает ноль.
Наконец, мы реализовали extractInputUDF, который будет использовать вышеупомянутые парсеры и вернет объект Input
или ноль.Сначала udf проверяет, является ли данный json допустимым, а затем пытается преобразовать json во входной объект с помощью jv.as[Input]
.Вы можете проверить json4s
библиотеку здесь , если вы с ней не знакомы.
Сноска:
Изначально я пытался добиться того желогика проверки с помощью функции from_json
со следующей схемой:
val details = (new StructType)
.add(StructField("id", IntegerType, false))
.add(StructField("attr2", IntegerType, false))
.add(StructField("attr3", StringType, false))
val schema = (new StructType)
.add(StructField("id", IntegerType, false))
.add(StructField("ts", LongType, false))
.add(StructField("details", ArrayType(details, false)))
Хотя в моем случае from_json
не работал должным образом.from_json
игнорирует необнуляемый набор правил схемы, который не может проанализировать данные, как ожидалось.Вот несколько тестов с использованием Spark 2.3 / 2.4:
df.withColumn("valid_json", from_json($"json", schema))
.select("valid_json")
.show(false)
//Output
+-------------------------------------------------------+
|valid_json |
+-------------------------------------------------------+
|[1, 1557994974, [[1, 3, something], [2, 3, something]]]|
|null |
|[3,,] |
|[3, 1557994976, [[1, 3, something], [2, 3,]]] |
|[4, 1557994977, []] |
+-------------------------------------------------------+
Я обнаружил, что существует открытый выпуск , связанный с этой проблемой.И, конечно, также есть обсуждение .