Spark / Scala - проверка документа JSON в строке потокового DataFrame - PullRequest
1 голос
/ 16 мая 2019

У меня есть потоковое приложение, которое обрабатывает потоковый DataFrame со столбцом «body», содержащим строку JSON.

Таким образом, в теле есть что-то вроде (это четыре входные строки):

{"id":1, "ts":1557994974, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3,"attr3":"something"}]}
{"id":2, "ts":1557994975, "details":[{"id":1,"attr2":"3","attr3":"something"}, {"id":2,"attr2":"3","attr3":"something"},{"id":3,"attr2":"3","attr3":"something"}]}
{"id":3, "ts":1557994976, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3}]}
{"id":4, "ts":1557994977, "details":[]}

Я хотел бы проверить, что каждая строка имеет правильную схему (типы данных и содержит все атрибуты).Я хотел бы отфильтровать и записать недействительные записи где-нибудь (например, файл Parquet).Меня особенно интересует массив «details» - каждый из вложенных документов должен иметь указанные поля и правильные типы данных.

Так что в приведенном выше примере допустима только строка id = 1.

Я думал о классе кейсов, например:

case class Detail(
  id: Int,
  attr2: Int,
  attr3: String
)

case class Input(
  id: Int,
  ts: Long,
  details: Seq[Detail]
)

и попробуйте, но не знаете, как это сделать.

Может кто-нибудь помочь, пожалуйста?

Спасибо

Ответы [ 3 ]

1 голос
/ 16 мая 2019

Один из подходов заключается в использовании JSON Schema , которая может помочь вам с проверкой схемы на данных.Страница Getting * - хорошее место для начала, если вы новичок.

Другой подход будет примерно работать следующим образом:

  1. Построить модели (классы дел) для каждого из объектов, которые вы пытались задать в своем вопросе.

  2. Используйте библиотеку JSON, например Spray JSON / Play-JSON , для анализа входного json.

  3. Для всех входных данных, которые не могут быть проанализированы в допустимых записях, скорее всего, они недействительны, и вы можете разделить эти выходные данные в другом приемнике в вашем искровом коде.Это также сделало бы это устойчивым, если у вас есть метод isValid для объектов, который может проверять правильность проанализированной записи или нет.

0 голосов
/ 17 мая 2019

Вот один из способов использования низкоуровневого API JSON.Мы используем функциональность библиотеки json4s, которая уже включена в Spark.Чтобы применить пользовательский контроль проверки к вашим объектам, вы можете написать два пользовательских считывателя для каждого из них, как показано ниже:

import org.json4s._
import org.json4s.jackson.JsonMethods._

case class Detail(id: Int, attr2: Int, attr3: String)
case class Input(id: Int, ts: Int, details: Seq[Detail])

implicit object DetailReader extends Reader[Detail] {
  def read(value: JValue): Detail = value match {
    case JObject(JField("id", JInt(id))
                 :: JField("attr2", JInt(attr2))
                   :: JField("attr3", JString(attr3))
                     :: Nil)

        if id != null && attr2 != null && attr3 != null 
          => new Detail(id.toInt, attr2.toInt, attr3)

        case _ => null
  }

  def isValidItem(item: Detail) = item != null && item.id > 0 && item.attr2 > 0 && item.attr3 != null
}

implicit object InputReader extends Reader[Input] {
  def read(value: JValue): Input = value match {
      case JObject(JField("id", JInt(id))
                 :: JField("ts", JInt(ts))
                   :: JField("details", JArray(details))
                     :: Nil)

      if id > 0 && ts > 0 && !details.isEmpty && toItemList(details).forall(DetailReader.isValidItem) 
          => new Input(id.toInt, ts.toInt, toItemList(details))

        case _ => null
  }

  private def toItemList(details: List[JValue]) = details.map(_.as[Detail])
}

val df = Seq(
("""{"id":1, "ts":1557994974, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3,"attr3":"something"}]}"""),
("""{"id":2, "ts":1557994975, "details":[{"id":1,"attr2":"3","attr3":"something"}, {"id":2,"attr2":"3","attr3":"something"}"""),
("""{"id":3,"attr2":"3","attr3":"something"}]}"""),
("""{"id":3, "ts":1557994976, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3}]}"""),
("""{"id":4, "ts":1557994977, "details":[]}"""))
.toDF("json")

val extractInputUDF = udf[Input, String]((json: String) => 
   parseOpt(json) match{
    case Some(jv: JValue) => jv.as[Input]
    case _ => null
  }
)

df.withColumn("parsed_item", extractItemUDF($"json"))
.select("parsed_item")
.show(false)

//Output
+-------------------------------------------------------+ 
|parsed_item                                            | 
+-------------------------------------------------------+ 
|[1, 1557994974, [[1, 3, something], [2, 3, something]]]| 
|null                                                   | 
|null                                                   | 
|null                                                   | 
|null                                                   | 
+-------------------------------------------------------+

DetailReader и InputReader отвечают за преобразование необработанных данных JSON вDetail и Input объектов соответственно.Мы реализуем read метод org.json4s.Reader[T] для каждого элемента, который мы хотим проанализировать / проверить.

DetailReader

  • Использовать сопоставление с образцом для сопоставления объектов класса Item на основе членов класса id, attr2 and attr3.
  • После сопоставления с пользовательским шаблономлогика проверки применяется с помощью проверки if id != null && attr2 != null && attr3 != null, которая гарантирует, что все атрибуты имеют действительное ненулевое значение.
  • Если все атрибуты соответствуют данным проверкам, создайте объект Detail на основе их значений, в противном случае возвратитеноль.

InputReader

  • Использование сопоставления с образцом для сопоставления объектов класса Input на основе членов класса id, ts, details.
  • После образцасопоставление пользовательской логики проверки применяется посредством проверки if id > 0 && ts > 0 && !details.isEmpty && toItemList(details).forall(DetailReader.isValidItem), обратите внимание, что здесь необходимо также проверить все элементы массива details.
  • Если все атрибуты соответствуют данным проверкам, создайте *В противном случае объект 1040 *, основанный на их значениях, возвращает ноль.

Наконец, мы реализовали extractInputUDF, который будет использовать вышеупомянутые парсеры и вернет объект Input или ноль.Сначала udf проверяет, является ли данный json допустимым, а затем пытается преобразовать json во входной объект с помощью jv.as[Input].Вы можете проверить json4s библиотеку здесь , если вы с ней не знакомы.

Сноска:

Изначально я пытался добиться того желогика проверки с помощью функции from_json со следующей схемой:

val details = (new StructType)
                    .add(StructField("id", IntegerType, false))
                    .add(StructField("attr2", IntegerType, false))
                    .add(StructField("attr3", StringType, false))

val schema = (new StructType)
   .add(StructField("id", IntegerType, false))
   .add(StructField("ts", LongType, false))
   .add(StructField("details", ArrayType(details, false))) 

Хотя в моем случае from_json не работал должным образом.from_json игнорирует необнуляемый набор правил схемы, который не может проанализировать данные, как ожидалось.Вот несколько тестов с использованием Spark 2.3 / 2.4:

df.withColumn("valid_json", from_json($"json", schema))
  .select("valid_json")
  .show(false)

//Output
+-------------------------------------------------------+ 
|valid_json                                             | 
+-------------------------------------------------------+ 
|[1, 1557994974, [[1, 3, something], [2, 3, something]]]| 
|null                                                   | 
|[3,,]                                                  | 
|[3, 1557994976, [[1, 3, something], [2, 3,]]]          | 
|[4, 1557994977, []]                                    | 
+-------------------------------------------------------+

Я обнаружил, что существует открытый выпуск , связанный с этой проблемой.И, конечно, также есть обсуждение .

0 голосов
/ 16 мая 2019

Самый простой способ для меня - создать Dataframe со схемой, а затем отфильтровать с помощью id == 1. Это не самый эффективный способ.

Слушайте, вы можете найти пример для создания кадра данных со Схемой: https://blog.antlypls.com/blog/2016/01/30/processing-json-data-with-sparksql/

Редактировать Я не могу найти предварительную фильтрацию для ускорения поиска JSON в Scala, но вы можете использовать следующие три параметра:

spark.read.schema(mySchema).format(json).load("myPath").filter($"id" === 1)

или

spark.read.schema(mySchema).json("myPath").filter($"id" === 1)

или

spark.read.json("myPath").filter($"id" === 1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...