UDF с пользовательской функцией разнесения на фрейме искры - PullRequest
0 голосов
/ 17 апреля 2020

У меня есть фрейм данных с данными json, я пытаюсь польстить им, используя функцию более плоской и функции explode_outer, но поздно обнаружил, что наша производственная система имеет поддержку только для spark 2.1, поэтому я не могу использовать explode_outer функция, так что я создал функцию udf для функции разнесения здесь код для функции и разнесения внешней функции.

 private val parseJsonUDF = udf((dfe: DataFrame, columnsToExplode: List[String])=>
  {
    val arrayFields = dfe.schema.fields
      .map(field => field.name -> field.dataType)
      .collect { case (name: String, type_val: ArrayType) => (name, type_val.asInstanceOf[ArrayType]) }
      .toMap

    columnsToExplode.foldLeft(dfe) { (dataFrame, arrayCol) =>
      dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
        .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))
    }

  })

И код, который использует udf

def flattenDataframe(df: DataFrame): DataFrame= {
      //getting all the fields from schema
      val fields = df.schema.fields
      val fieldNames = fields.map(x => x.name)
      //length shows the number of fields inside dataframe
      val length = fields.length
      for (i <- 0 to fields.length - 1) {
        val field = fields(i)
        val fieldtype = field.dataType
        val fieldName = field.name
        fieldtype match {
          case arrayType: ArrayType =>
            val fieldName1 = fieldName
            val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
            //val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
            val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"parseJsonUDF($df,$fieldName1) as $fieldName1")
            val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
            return flattenDataframe(explodedDf)

          case structType: StructType =>
            val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
            val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
            val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
            val explodedf = df.select(renamedcols: _*)
            return flattenDataframe(explodedf)
          case _ =>
        }
      }
      df
    }

При вызове функции я получаю эту ошибку.

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] is not supported

Данные

 val json_string = """{
                   "Total_Value": 3,
                   "Topic": "Example",
                   "values": [
                              {
                                "value1": "#example1",
                                "points": [
                                           [
                                           "123",
                                           "156"
                                          ]
                                    ],
                                "properties": {
                                 "date": "12-04-19",
                                 "model": "Model example 1"
                                    }
                                 },
                               {"value2": "#example2",
                                "points": [
                                           [
                                           "124",
                                           "157"
                                          ]
                                    ],
                                "properties": {
                                 "date": "12-05-19",
                                 "model": "Model example 2"
                                    }
                                 }
                              ]
                       }"""
...