Я подключаюсь к mongodb через spark, создаю сеанс spark и печатаю схему
val data1 = spark.read.format ("com.mongodb.spark. sql .DefaultSource"). Load ( )
data1.printSchema ()
root
| - _id: struct (nullable = true)
| - payLoad: string (nullable = true)
| - идентификатор пользователя: строка (nullable = true)
| - состояние: строка (nullable = true)
Источник ввода json : {"_id": ObjectId ("12"), "stateCode": "CH", "userid": "f9ade278-f880-4db8-8db31", "payLoad": {"a": [], "b" : [], "c": [{"d": "16743638", "es": [{"name": "Cd", "value": "1"}, {"name": "dr "," value ":" No "}, {" name ":" date "," value ":" 2020-01-24T00: 00: 00 "}, {" name ":" originalAddDate "," value ": "2020-01-24T00: 00: 00"}]}}
Моя полезная нагрузка - это объектный тип данных в mon go, но он приходит как строка в искре, когда я преобразовываю полезную нагрузку в rdd, я получаю ее с вложенной структурой.
var r dd = data1.select ("payLoad"). rdd.map (_. getString (0))
val df = spark.read. json (rdd)
df.printSchema ()
ИД пользователя и состояние являются ключами для документа json, так как я выбираю только полезную нагрузку для rdd и выравниваю ее, я не могу добавить идентификатор пользователя и состояние в фрейм данных df.
Мой ожидаемый вывод:
Я хочу добавить идентификатор пользователя и состояние в df в моем случае выше
Может кто-нибудь поделиться своими мыслями о том, как этого можно достичь?