У меня есть сложный вложенный файл данных json, как показано ниже, и я пытаюсь использовать данные и преобразовать их как
для следующего класса
case class DeviceData (id: Int, device: String)
, где id = 0 и
device = "{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
Но я застрял на самом первом шаге при использовании данных и преобразовании их в простой фрейм данных и получении ошибки _corrupt_record. Посоветуйте пожалуйста какую ошибку я допустил. Я использую Spark версии 2.4.5
export1. json
0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"
, и мой искровой код указан ниже
package sparkWCExample.spWCExample
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
import java.util.Formatter.DateTime
import org.apache.spark.sql.types._ // include the Spark Types to define our schema
import org.apache.spark.sql.functions._ // include the Spark helper functions
import org.apache.spark.sql.functions.to_timestamp
case class DeviceData (id: Int, device: String)
object DatasetExample {
def main(args: Array[String]) {
println("Start now")
val conf = new SparkConf().setAppName("Spark Scala WordCount Example").setMaster("local[1]")
val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
val sc: SparkContext = spark.sparkContext
import spark.implicits._
val readJSONDF = spark.read.json(sc.wholeTextFiles("C:\\Sankha\\Study\\data\\complex-nested-json\\export1.json").values).toDF()
println(readJSONDF.show())
}
}
Я получаю исключение
+--------------------+
| _corrupt_record|
+--------------------+
|0,"{""device_id""...|
+--------------------+