Я работаю с dstream из kafka, который выглядит как запись ниже. Я изо всех сил пытался получить правильную настройку схемы с вложенными полями JSON. Вот пример того, что я делаю. Что мне не хватает, так это способности просто получить фактическое значение, а не массив или тип rdd. Ценю любую помощь.
{"Source":"10.30.110.45:42757","Telemetry":{"node_id_str":"ASR9006","subscription_id_str":"qos","encoding_path":"Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/data-rate","collection_id":30905218,"collection_start_time":1524503864744,"msg_timestamp":1524503864744,"collection_end_time":1524503864746},"Rows":[{"Timestamp":1524503864746,"Keys":{"interface-name":"Bundle-Ether56"},"Content":{"bandwidth":40000000,"input-data-rate":4300587,"input-load":27,"input-packet-rate":5375721,"load-interval":0,"output-data-rate":12,"output-load":0,"output-packet-rate":5,"peak-input-data-rate":0,"peak-input-packet-rate":0,"peak-output-data-rate":0,"peak-output-packet-rate":0,"reliability":255}}]}
и код выглядит следующим образом:
val schema_array = StructType (Array(
StructField("Source",StringType),
StructField("Telemetry",StructType(Array(
StructField("collection_end_time",LongType),
StructField("collection_id",LongType),
StructField("collection_start_time",LongType),
StructField("encoding_path",StringType),
StructField("msg_timestamp",LongType),
StructField("node_id_str",StringType),
StructField("subscription_id_str",StringType)
))),
StructField("Rows",ArrayType(StructType(Array(
StructField("Timestamp",LongType),
StructField("Keys",StructType(Array(
StructField("interface-name",StringType)))),
StructField("Content",StructType(Array(
StructField("bandwidth",LongType),
StructField("input-data-rate",LongType),
StructField("input-load",LongType),
StructField("input-packet-rate",LongType),
StructField("load-interval",LongType),
StructField("output-data-rate",LongType),
StructField("output-load",LongType),
StructField("output-packet-rate",LongType),
StructField("peak-input-data-rate",LongType),
StructField("peak-input-packet-rate",LongType),
StructField("peak-output-data-rate",LongType),
StructField("peak-output-packet-rate",LongType),
StructField("reliability",LongType))))))))))
stream.foreachRDD { (rdd, time) =>
val data = rdd.map (record => record.value)
val jsonData = spark.read.schema(schema_array).json(data)
val result = jsonData.select("Rows.Keys.interface-name")
result.show()
Мой результат:
+----------------+
| interface-name|
+----------------+
|[Bundle-Ether56]|
+----------------+
Ожидаемый результат:
+----------------+
| interface-name|
+----------------+
| Bundle-Ether56 |
+----------------+
`