Массив JSON в Dataframe в Spark, полученный Kafka - PullRequest
0 голосов
/ 16 декабря 2018

Я пишу приложение Spark в Scala, используя Spark Structured Streaming, которое получает некоторые данные, отформатированные в стиле JSON от Кафки.Это приложение может получать как один, так и несколько объектов JSON, отформатированных таким образом:

[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]

Я попытался определить тип StructType:

var schema = StructType(
                  Array(
                        StructField("key1",DataTypes.StringType),
                        StructField("key2",DataTypes.StringType)
             ))

Но это не работает.Фактический код для анализа JSON:

var data = (this.stream).getStreamer().load()
  .selectExpr("CAST (value AS STRING) as json")
  .select(from_json($"json",schema=schema).as("data"))

Я хотел бы получить объекты JSON в кадре данных, например

+----------+---------+
|      key1|     key2|
+----------+---------+
|    value1|   value2|
|    value1|   value2|
        ........
|    value1|   value2|
+----------+---------+

Кто-нибудь может мне помочь, пожалуйста?Спасибо!

1 Ответ

0 голосов
/ 23 декабря 2018

Поскольку входящая строка имеет значение Array из JSON, один из способов - написать UDF для анализа Array, а затем разобрать проанализированный Array.Ниже приведен полный код с объяснением каждого шага.Я написал его для пакета, но то же самое можно использовать для потоковой передачи с минимальными изменениями.

object JsonParser{

  //case class to parse the incoming JSON String
  case class JSON(key1: String, key2: String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.
      builder().
      appName("JSON").
      master("local").
      getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    //sample JSON array String coming from kafka
    val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")

    //UDF to parse JSON array String
    val jsonConverter = udf { jsonString: String =>
      val mapper = new ObjectMapper()
      mapper.registerModule(DefaultScalaModule)
      mapper.readValue(jsonString, classOf[Array[JSON]])
    }

    val df = str.toDF("json") //json String column
      .withColumn("array", jsonConverter($"json")) //parse the JSON Array
      .withColumn("json", explode($"array")) //explode the Array
      .drop("array") //drop unwanted columns
      .select("json.*") //explode the JSON to separate columns

    //display the DF
    df.show()
    //+------+------+
    //|  key1|  key2|
    //+------+------+
    //|value1|value2|
    //|value3|value4|
    //+------+------+

  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...