Я работаю над потоковым приложением со искрой и пытаюсь проанализировать JSON, указанный в следующем формате.
{"name":"xyz","age":29,"details":["city":"mumbai","country":"India"]}
{"name":"abc","age":25,"details":["city":"mumbai","country":"India"]}
Ниже приведен мой код Spark для анализа JSON:
import org.apache.spark.sql.types._
import spark.implicits._
val schema= new StructType()
.add("name",DataTypes.StringType )
.add("age", DataTypes.IntegerType)
.add("details",
new StructType()
.add("city", DataTypes.StringType)
.add("country", DataTypes.StringType)
)
val dfLogLines = dfRawData.selectExpr("CAST(value AS STRING)") //Converting binary to text
val personNestedDf = dfLogLines.select(from_json($"value", schema).as("person"))
val personFlattenedDf = personNestedDf.selectExpr("person.name", "person.age")
personFlattenedDf.printSchema()
personFlattenedDf.writeStream.format("console").option("checkpointLocation",checkpoint_loc3).start().awaitTermination()
Вывод:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+
|name| age|
+----+----+
|null|null|
|null|null|
+----+----+
Код не выдает никакой ошибки, но возвращает нулевые значения в выводе. Что я здесь не так делаю? Заранее спасибо.