Сравните два столбца в кадре данных и найдите скорость изменения значений - PullRequest
0 голосов
/ 15 октября 2019

Я пытаюсь сравнить два столбца в кадре данных и выяснить скорость изменения значения. Я написал UDF для достижения этой цели, но получил ошибку при выполнении.

Ниже приведена структура данных в кадре данных.

+------------+-------------+-----------+------+
| NUM_ID     | TIME        |PREVIOUS_SG1|SG1_V|
+------------+-------------+-----------+------+
|XXXXX01     |1570167499000|  null     |79.0  |
|XXXXX01     |1570167502000|   79.0    |88.0  |
|XXXXX01     |1570167503000|  88.0     |99.0  |
|XXXXX01     |1570179810000|  99.0     |null  |
|XXXXX01     |1570179811000|  null     |100.0 |

Ниже приведена схема для этого кадра данных.

scala> castDF.printSchema
root
 |-- NUM_ID: string (nullable = true)
 |-- TIME: long (nullable = true)
 |-- PREVIOUS_SG1: double (nullable = true)
 |-- SG1_V: double (nullable = true)

Ниже написано UDF.

def UDF_D:UserDefinedFunction=udf((PREV: Double,CURR: Double)=>{
  if(PREV != null || PREV !=0){
  val out = ((CURR-PREV)/PREV)*100
  out
  }})

и код scala для вызова UDF

val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))

При выполнении я получаю ошибку ниже.

scala> val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))
java.lang.UnsupportedOperationException: Schema for type AnyVal is not supported

Нужно ли выполнять приведение к типу UDF или нулевые значения вызывают проблему? Я надеюсь, что передаю значение Double и не имею дело с любым другим типом.

Ответы [ 2 ]

1 голос
/ 15 октября 2019

вам не нужен udf для этого

df.select(when(('PREV.isNull || 'CURR === 0),  (('CURR-'PREV)/'PREV)*100).otherwise(0))

и как функция

 def compareCols(PREV: Column, CURR: Column): Column = {
    when((PREV.isNull || CURR === 0),  ((CURR-PREV)/PREV)*100).otherwise(0)
  }

 val diffDF = df.withColumn("SG1_DIFF", compareCols('PREV,'CURR))
1 голос
/ 15 октября 2019

Нет необходимости выполнять преобразование для вызова UDF, однако UDF и тип столбца должны быть синхронизированы. Также нулевые значения не вызывают проблемы.

Проблема в UDF, UDF всегда должна возвращать значение. Добавьте условие else в UDF, когда входные данные равны нулю или 0;

def UDF_D: UserDefinedFunction = udf((PREV: Double, CURR: Double) => {
    if (PREV != null || PREV != 0 || CURR != null || CURR != 0) {
      val out = ((CURR - PREV) / PREV) * 100
      out
    } else 0
})
...