Искровой поток JSON значение в столбце данных scala - PullRequest
0 голосов
/ 10 марта 2020

У меня есть текстовый файл со значением json. и это читается в DF

{"name":"Michael"}
{"name":"Andy", "age":30}

Я хочу выводить схему динамически для каждой строки при Потоковое и сохранять ее в отдельных местах (таблицах) в зависимости от ее схемы.

к сожалению, пока я пытаюсь прочитать value.schema, он по-прежнему отображается как String. Пожалуйста, помогите, как это сделать при потоковой передаче, поскольку RDD не допускается при потоковой передаче.

Я хотел использовать следующий код, который не работает, поскольку значение по-прежнему читается как строковый формат.

val jsonSchema = newdf1.select("value").as[String].schema
val df1 = newdf1.select(from_json($"value", jsonSchema).alias("value_new"))
val df2 = df1.select("value_new.*")

Я даже пытался использовать,

schema_of_json("json_schema"))

val jsonSchema: String = newdf.select(schema_of_json(col("value".toString))).as[String].first()

все еще нет надежды .. Пожалуйста, помогите ..

1 Ответ

0 голосов
/ 10 марта 2020

Вы можете загрузить данные как textFile, создать класс case для person и проанализировать каждую json строку в экземпляр Person, используя json4s или gson, а затем создать Dataframe следующим образом:

case class Person(name: String, age: Int)
val jsons = spark.read.textFile("/my/input")
val persons = jsons.map{json => toPerson(json) //instead of 'toPerson' actually parse with json4s or gson to return Person instance}
val df = sqlContext.createDataFrame(persons)

Deserialize json к классу дел с использованием json4s:
https://commitlogs.com/2017/01/14/serialize-deserialize-json-with-json4s-in-scala/

десериализации json к классу дел с использованием gson:
https://alvinalexander.com/source-code/scala/scala-case-class-gson-json-object-deserialization-and-scalatra

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...