Spark создает новый столбец на основе сопоставленного значения существующего столбца - PullRequest
1 голос
/ 25 июня 2019

Я пытаюсь отобразить значения одного столбца в моем фрейме данных на новое значение и поместить его в новый столбец, используя UDF, но я не могу заставить UDF принять параметр, который не является также столбцом,Например, у меня есть фрейм данных dfOriginial, подобный этому:

+-----------+-----+
|high_scores|count|
+-----------+-----+
|          9|    1|
|         21|    2|
|         23|    3|
|          7|    6|
+-----------+-----+

И я пытаюсь получить представление о бине, в которое попадает числовое значение, поэтому я могу составить список бинов, например так:

case class Bin(binMax:BigDecimal, binWidth:BigDecimal) {
    val binMin = binMax - binWidth

    // only one of the two evaluations can include an  "or=", otherwise a value could fit in 2 bins
    def fitsInBin(value: BigDecimal): Boolean = value > binMin && value <= binMax

    def rangeAsString(): String = {
        val sb = new StringBuilder()
        sb.append(trimDecimal(binMin)).append(" - ").append(trimDecimal(binMax))
        sb.toString()
    }
}

И затем я хочу преобразовать мой старый фрейм данных таким образом, чтобы сделать dfBin:

+-----------+-----+---------+
|high_scores|count|bin_range|
+-----------+-----+---------+
|          9|    1| 0 - 10  |
|         21|    2| 20 - 30 |
|         23|    3| 20 - 30 |
|          7|    6| 0 - 10  |
+-----------+-----+---------+

, чтобы я мог в конечном итоге получить количество экземпляров бинов поВызов .groupBy("bin_range").count().

Я пытаюсь сгенерировать dfBin с помощью функции withColumn с UDF.

Вот код с UDF, который я пытаюсь использовать:

val convertValueToBinRangeUDF = udf((value:String, binList:List[Bin]) => {
    val number = BigDecimal(value)
    val bin = binList.find( bin => bin.fitsInBin(number)).getOrElse(Bin(BigDecimal(0), BigDecimal(0)))
    bin.rangeAsString()
})

val binList = List(Bin(10, 10), Bin(20, 10), Bin(30, 10), Bin(40, 10), Bin(50, 10))

val dfBin = dfOriginal.withColumn("bin_range", convertValueToBinRangeUDF(col("high_scores"), binList))

Но это дает мне несоответствие типов:

Error:type mismatch;
 found   : List[Bin]
 required: org.apache.spark.sql.Column
        val valueCountsWithBin = valuesCounts.withColumn(binRangeCol, convertValueToBinRangeUDF(col(columnName), binList))

Видя определение UDF, я думаю, что он должен нормально обрабатывать преобразование, но это явно не так, есть идеи?

Ответы [ 2 ]

2 голосов
/ 25 июня 2019

Проблема в том, что все параметры для UDF должны быть столбцового типа. Одним из решений было бы преобразовать binList в столбец и передать его в UDF, аналогично текущему коду.

Однако проще немного настроить UDF и превратить его в def. Таким образом, вы можете легко передавать другие данные, не относящиеся к типу столбцов:

def convertValueToBinRangeUDF(binList: List[Bin]) = udf((value:String) => {
  val number = BigDecimal(value)
  val bin = binList.find( bin => bin.fitsInBin(number)).getOrElse(Bin(BigDecimal(0), BigDecimal(0)))
  bin.rangeAsString()
})

Использование:

val dfBin = valuesCounts.withColumn("bin_range", convertValueToBinRangeUDF(binList)($"columnName"))
1 голос
/ 25 июня 2019

Попробуйте это -

scala> case class Bin(binMax:BigDecimal, binWidth:BigDecimal) {
     |     val binMin = binMax - binWidth
     |
     |     // only one of the two evaluations can include an  "or=", otherwise a value could fit in 2 bins
     |     def fitsInBin(value: BigDecimal): Boolean = value > binMin && value <= binMax
     |
     |    def rangeAsString(): String = {
     |       val sb = new StringBuilder()
     |       sb.append(binMin).append(" - ").append(binMax)
     |       sb.toString()
     |     }
     | }
defined class Bin


scala> val binList = List(Bin(10, 10), Bin(20, 10), Bin(30, 10), Bin(40, 10), Bin(50, 10))
binList: List[Bin] = List(Bin(10,10), Bin(20,10), Bin(30,10), Bin(40,10), Bin(50,10))


scala> spark.udf.register("convertValueToBinRangeUDF", (value: String) => {
     |     val number = BigDecimal(value)
     |     val bin = binList.find( bin => bin.fitsInBin(number)).getOrElse(Bin(BigDecimal(0), BigDecimal(0)))
     |     bin.rangeAsString()
     | })
res13: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


//-- Testing with one record

scala> val dfOriginal = spark.sql(s""" select "9" as `high_scores`, "1" as count """)
dfOriginal: org.apache.spark.sql.DataFrame = [high_scores: string, count: string]


scala> dfOriginal.createOrReplaceTempView("dfOriginal")

scala> val dfBin = spark.sql(s"""  select high_scores, count, convertValueToBinRangeUDF(high_scores) as bin_range from dfOriginal """)
dfBin: org.apache.spark.sql.DataFrame = [high_scores: string, count: string ... 1 more field]

scala> dfBin.show(false)
+-----------+-----+---------+
|high_scores|count|bin_range|
+-----------+-----+---------+
|9          |1    |0 - 10   |
+-----------+-----+---------+

Надеюсь, это поможет.

...