Создание правильной схемы для потокового RDD - PullRequest
0 голосов
/ 12 мая 2018

Я работаю с dstream из kafka, который выглядит как запись ниже. Я изо всех сил пытался получить правильную настройку схемы с вложенными полями JSON. Вот пример того, что я делаю. Что мне не хватает, так это способности просто получить фактическое значение, а не массив или тип rdd. Ценю любую помощь.

{"Source":"10.30.110.45:42757","Telemetry":{"node_id_str":"ASR9006","subscription_id_str":"qos","encoding_path":"Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/data-rate","collection_id":30905218,"collection_start_time":1524503864744,"msg_timestamp":1524503864744,"collection_end_time":1524503864746},"Rows":[{"Timestamp":1524503864746,"Keys":{"interface-name":"Bundle-Ether56"},"Content":{"bandwidth":40000000,"input-data-rate":4300587,"input-load":27,"input-packet-rate":5375721,"load-interval":0,"output-data-rate":12,"output-load":0,"output-packet-rate":5,"peak-input-data-rate":0,"peak-input-packet-rate":0,"peak-output-data-rate":0,"peak-output-packet-rate":0,"reliability":255}}]}

и код выглядит следующим образом:

val schema_array = StructType (Array(
        StructField("Source",StringType),
        StructField("Telemetry",StructType(Array(
          StructField("collection_end_time",LongType),
          StructField("collection_id",LongType),
          StructField("collection_start_time",LongType),
          StructField("encoding_path",StringType),
          StructField("msg_timestamp",LongType),
          StructField("node_id_str",StringType),
          StructField("subscription_id_str",StringType)
        ))),
          StructField("Rows",ArrayType(StructType(Array(
            StructField("Timestamp",LongType),
            StructField("Keys",StructType(Array(
              StructField("interface-name",StringType)))),
            StructField("Content",StructType(Array(
            StructField("bandwidth",LongType),
            StructField("input-data-rate",LongType),
            StructField("input-load",LongType),
            StructField("input-packet-rate",LongType),
            StructField("load-interval",LongType),
            StructField("output-data-rate",LongType),
            StructField("output-load",LongType),
            StructField("output-packet-rate",LongType),
            StructField("peak-input-data-rate",LongType),
            StructField("peak-input-packet-rate",LongType),
            StructField("peak-output-data-rate",LongType),
            StructField("peak-output-packet-rate",LongType),
            StructField("reliability",LongType))))))))))

    stream.foreachRDD { (rdd, time)  =>
        val data = rdd.map (record => record.value)
        val jsonData = spark.read.schema(schema_array).json(data)

        val result = jsonData.select("Rows.Keys.interface-name")
        result.show()

Мой результат:

+----------------+
|  interface-name|
+----------------+
|[Bundle-Ether56]|
+----------------+

Ожидаемый результат:

+----------------+
|  interface-name|
+----------------+
| Bundle-Ether56 |
+----------------+

`

1 Ответ

0 голосов
/ 14 мая 2018

Поработав некоторое время, я обнаружил, что метод взрыва, похоже, сработал для того, что я хотел сделать. Я верю, что, поскольку я делаю forEach и получаю только одну запись за раз, я в безопасности.

import org.apache.spark.sql.functions.explode
  val result = jsonData.select(explode($"Rows.Keys.interface-name"))
    result.show()

Результат

+--------------+
|           col|
+--------------+ 
|Bundle-Ether56|
+--------------+
...