опция spark from_json accept_case_insensitive_properties Как настроить - PullRequest
0 голосов
/ 23 октября 2019

jsonStrDF ключ json Скажите ему, чтобы он игнорировал регистр.

val mapOption = Map("accept_case_insensitive_properties" -> "true")
val newDF = jsonStrDF.withColumn("data_col", from_json(col("msg"), structSeqSchme, mapOption))

Как вы устанавливаете его, как Jackson, как вы устанавливаете его в искре? спасибо

com.fasterxml.jackson.databind.MapperFeature
   ACCEPT_CASE_INSENSITIVE_PROPERTIES(false),

код объяснения

 def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").
      enableHiveSupport().getOrCreate()
    spark.sqlContext.setConf("spark.sql.caseSensitive", "false")
    import spark.implicits._
    //hive table 存进行时会自动转小写
    val hivetable =  """{"deliverysystype":"dms","aaaa":"dms","orderid":"B0001-N103-000-005882-RL3AI2RWCP","storeid":"N103"}"""
    val hiveDF = Seq(hivetable).toDF("msg")
    val rdd = hiveDF.rdd.map(_.getString(0))
    rdd.toDS().show(false)
    val jsonDataDF = spark.read.json(rdd.toDS())
    val jsonstr =
      """{"data":{"deliverySysType":"dms","orderId":"B0001-N103-000-005882-RL3AI2RWCP","storeId":"N103"},"accessKey":"f9d069861dfb1678","actionName":"candao.rider.getDeliveryInfo","timestamp":1571587522000,"ticket":"B0001.N127.FBDDS2.20191021000522156","serviceType":"delivery","sign":"fa0239c75e065cf43d0a4040665578ba" }"""
    val jsonStrDF = Seq(jsonstr).toDF("msg")
    jsonStrDF.show(false)
    val structSeqSchme = StructType(Seq(StructField("data", jsonDataDF.schema, true),
      StructField("accesskey", StringType, true),//这里应该 accessKey
      StructField("actionName", StringType, true)))
    //hive col name lower case, json data key capital and small letter,Take less than value
    val mapOption = Map("accept_case_insensitive_properties" -> "true")//I'm not doing anything here, but I don't know how to set a value, right?
    val newDF = jsonStrDF.withColumn("data_col", from_json(col("msg"), structSeqSchme, mapOption))
    newDF.show(false)
    newDF.printSchema()
    newDF.select($"data_col.accessKey", $"data_col.actionName", $"data_col.data.*").show(false)
    //+---------+----------------------------+----+---------------+-------+-------+
    //|accessKey|actionName                  |aaaa|deliverysystype|orderid|storeid|
    //+---------+----------------------------+----+---------------+-------+-------+
    //|null     |candao.rider.getDeliveryInfo|null|null           |null   |null   |
    //+---------+----------------------------+----+---------------+-------+-------+
  }
...