Ошибка в spark scala ide при чтении вложенного полного JSON файла - PullRequest
1 голос
/ 13 марта 2020

У меня есть сложный вложенный файл данных json, как показано ниже, и я пытаюсь использовать данные и преобразовать их как

для следующего класса

case class DeviceData (id: Int, device: String)

, где id = 0 и

device = "{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"

Но я застрял на самом первом шаге при использовании данных и преобразовании их в простой фрейм данных и получении ошибки _corrupt_record. Посоветуйте пожалуйста какую ошибку я допустил. Я использую Spark версии 2.4.5

export1. json

0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"

, и мой искровой код указан ниже

package sparkWCExample.spWCExample

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
import java.util.Formatter.DateTime
import org.apache.spark.sql.types._  // include the Spark Types to define our schema
import org.apache.spark.sql.functions._ // include the Spark helper functions
import org.apache.spark.sql.functions.to_timestamp

case class DeviceData (id: Int, device: String)

object DatasetExample {

  def main(args: Array[String]) {
    println("Start now")
    val conf = new SparkConf().setAppName("Spark Scala WordCount Example").setMaster("local[1]")
    val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    import spark.implicits._

val readJSONDF = spark.read.json(sc.wholeTextFiles("C:\\Sankha\\Study\\data\\complex-nested-json\\export1.json").values).toDF()
  println(readJSONDF.show())
}
}

Я получаю исключение

+--------------------+
|     _corrupt_record|
+--------------------+
|0,"{""device_id""...|
+--------------------+

1 Ответ

1 голос
/ 14 марта 2020

sc.wholeTextFiles создает PairRDD, ключом которого является имя файла, а значением - содержимое всего файла. Более подробную информацию можно найти здесь .

Возможно, вы захотите использовать spark.read.text , а затем разбить строки:

val df = spark.read.text("export1.json")
  .map(row => {
    val s = row.getAs[String](0)
    val index = s.indexOf(',')
    DeviceData(s.substring(0, index).toInt, s.substring(index+1))
  })
df.show

печатает

+---+--------------------+
| id|              device|
+---+--------------------+
|  0|"{""device_id"": ...|
|  1|"{""device_id"": ...|
|  2|"{""device_id"": ...|
+---+--------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...