Передача всей строки в качестве аргумента для запуска udf через фрейм данных spark - создает исключение AnalysisException - PullRequest
0 голосов
/ 20 марта 2019

Я пытаюсь передать всю строку в spark udf вместе с несколькими другими аргументами. Я не использую spark sql, а использую dataframe withColumn api , но получаю следующее исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) col3#9 missing from col1#7,col2#8,col3#13 in operator !Project [col1#7, col2#8, col3#13, UDF(col3#9, col2, named_struct(col1, col1#7, col2, col2#8, col3, col3#9)) AS contcatenated#17]. Attribute(s) with the same name appear in the operation: col3. Please check if the right attribute(s) are used.;;

Вышеуказанное исключение можно повторить, используя следующий код:

    addRowUDF() // call invokes

    def addRowUDF() {
        import org.apache.spark.SparkConf
        import org.apache.spark.sql.SparkSession

        val spark = SparkSession.builder().config(new SparkConf().set("master", "local[*]")).appName(this.getClass.getSimpleName).getOrCreate()

        import spark.implicits._
        val df = Seq(
          ("a", "b", "c"),
          ("a1", "b1", "c1")).toDF("col1", "col2", "col3")
        execute(df)
      }

  def execute(df: org.apache.spark.sql.DataFrame) {

    import org.apache.spark.sql.Row
    def concatFunc(x: Any, y: String, row: Row) = x.toString + ":" + y + ":" + row.mkString(", ")

    import org.apache.spark.sql.functions.{ udf, struct }

    val combineUdf = udf((x: Any, y: String, row: Row) => concatFunc(x, y, row))

    def udf_execute(udf: String, args: org.apache.spark.sql.Column*) = (combineUdf)(args: _*)

    val columns = df.columns.map(df(_))

    val df2 = df.withColumn("col3", lit("xxxxxxxxxxx"))

    val df3 = df2.withColumn("contcatenated", udf_execute("uudf", df2.col("col3"), lit("col2"), struct(columns: _*)))

    df3.show(false)
  }

вывод должен быть:

+----+----+-----------+----------------------------+
|col1|col2|col3       |contcatenated               |
+----+----+-----------+----------------------------+
|a   |b   |xxxxxxxxxxx|xxxxxxxxxxx:col2:a, b, c    |
|a1  |b1  |xxxxxxxxxxx|xxxxxxxxxxx:col2:a1, b1, c1 |
+----+----+-----------+----------------------------+

1 Ответ

1 голос
/ 20 марта 2019

Это происходит потому, что вы ссылаетесь на столбец, которого больше нет в области видимости. Когда вы звоните:

val df2 = df.withColumn("col3", lit("xxxxxxxxxxx"))

затеняет исходный столбец col3, эффективно делая доступными предыдущие столбцы с тем же именем. Даже если это не так, скажем, после:

val df2 = df.select($"*", lit("xxxxxxxxxxx") as "col3")

новый col3 будет неоднозначным и неотличимым по имени от того, который определен *.

Таким образом, для достижения необходимого результата вам нужно использовать другое имя:

val df2 = df.withColumn("col3_", lit("xxxxxxxxxxx"))

и затем измените оставшуюся часть кода соответствующим образом:

df2.withColumn(
  "contcatenated", 
  udf_execute("uudf", df2.col("col3_") as "col3", 
  lit("col2"), struct(columns: _*))
).drop("_3")

Если логика так же проста, как в примере, вы, конечно, можете просто встроить вещи:

df.withColumn(
  "contcatenated", 
  udf_execute("uudf", lit("xxxxxxxxxxx") as "col3", 
  lit("col2"), struct(columns: _*))
).drop("_3")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...