Как анализировать записи JSON в структурированном потоке? - PullRequest
2 голосов
/ 23 октября 2019

Я работаю над потоковым приложением со искрой и пытаюсь проанализировать JSON, указанный в следующем формате.

{"name":"xyz","age":29,"details":["city":"mumbai","country":"India"]}
{"name":"abc","age":25,"details":["city":"mumbai","country":"India"]}

Ниже приведен мой код Spark для анализа JSON:

import org.apache.spark.sql.types._
import spark.implicits._
 val schema= new StructType()
    .add("name",DataTypes.StringType )
    .add("age", DataTypes.IntegerType)
    .add("details",
      new StructType()
        .add("city", DataTypes.StringType)
        .add("country", DataTypes.StringType)
    )

  val dfLogLines = dfRawData.selectExpr("CAST(value AS STRING)") //Converting binary to text

  val personNestedDf = dfLogLines.select(from_json($"value", schema).as("person"))
  val personFlattenedDf = personNestedDf.selectExpr("person.name", "person.age")

  personFlattenedDf.printSchema()
  personFlattenedDf.writeStream.format("console").option("checkpointLocation",checkpoint_loc3).start().awaitTermination()

Вывод:

root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+
|name| age|
+----+----+
|null|null|
|null|null|
+----+----+

Код не выдает никакой ошибки, но возвращает нулевые значения в выводе. Что я здесь не так делаю? Заранее спасибо.

1 Ответ

2 голосов
/ 23 октября 2019

tl; dr JSON выглядит неправильно в поле details.


Из документации from_json стандартная функция:

Возвращает ноль, в случае непарсируемой строки.

Проблема с полем details.

{"details": [" city ":" mumbai "," country ":" India "]}

Выглядит как массив или карта, но ни один не соответствует.

scala> Seq(Array("one", "two")).toDF("value").toJSON.show(truncate = false)
+-----------------------+
|value                  |
+-----------------------+
|{"value":["one","two"]}|
+-----------------------+

scala> Seq(Map("one" -> "two")).toDF("value").toJSON.show(truncate = false)
+-----------------------+
|value                  |
+-----------------------+
|{"value":{"one":"two"}}|
+-----------------------+

scala> Seq(("mumbai", "India")).toDF("city", "country").select(struct("city", "country") as "details").toJSON.show(truncate = false)
+-----------------------------------------------+
|value                                          |
+-----------------------------------------------+
|{"details":{"city":"mumbai","country":"India"}}|
+-----------------------------------------------+

Моя рекомендация заключается в том, чтобы выполнить анализ JSON самостоятельно, используя пользовательскую функцию (UDF).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...