Можно ли применять, когда другие функции внутри agg после groupBy? - PullRequest
0 голосов
/ 13 марта 2020

Недавно пытался применить функцию по умолчанию к агрегированным значениям, которые вычислялись, чтобы впоследствии мне не пришлось их повторно обрабатывать. Насколько я вижу, я получаю следующую ошибку.

Caused by: java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported

Из следующей функции.

    val defaultUDF: UserDefinedFunction = udf[Column, Column, Any](defaultFunction)
    def defaultFunction(col: Column, value: Any): Column = {
      when(col.equalTo(value), null).otherwise(col)
    }

И применяю ее следующим образом.

    val initialDF = Seq(
      ("a", "b", 1),
      ("a", "b", null),
      ("a", null, 0)
    ).toDF("field1", "field2", "field3")

    initialDF
      .groupBy("field1", "field2")
      .agg(
        defaultUDF(functions.count("field3"), lit(0)).as("counter") // exception thrown here
      )

Пытаюсь ли я сделать здесь черную магию c или мне чего-то не хватает?

1 Ответ

2 голосов
/ 15 марта 2020

Проблема в реализации вашего UserDefinedFunction:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

def defaultFunction(col: Column, value: Any): Column = {
  when(col.equalTo(value), null).otherwise(col)
}

val defaultUDF: UserDefinedFunction = udf[Column, Column, Any](defaultFunction)
// java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
//   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
//   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
//   at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
//   at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
//   at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
//   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
//   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:720)
//   at org.apache.spark.sql.functions$.udf(functions.scala:3914)
//   ... 65 elided

Ошибка, которую вы получаете, в основном из-за того, что Spark не может отобразить тип возврата (то есть Column) вашего UserDefinedFunction defaultFunction для Spark DataType. Ваша defaultFunction должна принимать и возвращать Scala типы, которые соответствуют Spark DataType. Вы можете найти список поддерживаемых Scala типов здесь: https://spark.apache.org/docs/latest/sql-reference.html#data -типы

В любом случае вам не нужен UserDefinedFunction, если ваша функция принимает столбцы и возвращает Колонна. Для вашего варианта использования будет работать следующий код:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class Record(field1: String, field2: String, field3: java.lang.Integer)

val df = Seq(
  Record("a", "b", 1),
  Record("a", "b", null),
  Record("a", null, 0)
).toDS

df.show

// +------+------+------+
// |field1|field2|field3|
// +------+------+------+
// |     a|     b|     1|
// |     a|     b|  null|
// |     a|  null|     0|
// +------+------+------+

def defaultFunction(col: Column, value: Any): Column = {
  when(col.equalTo(value), null).otherwise(col)
}

df
.groupBy("field1", "field2")
.agg(defaultFunction(count("field3"), lit(0)).as("counter"))
.show

// +------+------+-------+                                                         
// |field1|field2|counter|
// +------+------+-------+
// |     a|     b|      1|
// |     a|  null|      1|
// +------+------+-------+
...