spark udf возвращает тот же элемент, который выдает ошибку для массива структуры - PullRequest
0 голосов
/ 04 декабря 2018

Моя версия Spark 2.1.0.Я просто делаю фиктивную операцию над массивом, т.е. просто возвращаю его обратно с приведенным ниже определением udf.Но это не работает! /

val df = spark.read.format("csv").load("trans.txt").toDF("id", "dt", "amt")
val df2 = df.groupBy("id").agg(collect_list(struct('dt,'amt)).as("trans_vec"))
df2.show(false)
df2.printSchema()

def gen_rows(x:Seq[(String,String)]):Seq[(String,String)]={
  x
}
val udf_gen_rows = udf( gen_rows(_:Seq[(String,String)]):Seq[(String,String)] )

df2.withColumn("row_number",udf_gen_rows('trans_vec)).show(false)

Выдает приведенную ниже ошибку

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(trans_vec)' due to data type mismatch: argument 1 requires array<struct<_1:string,_2:string>> type, however, '`trans_vec`' is of array<struct<dt:string,amt:string>> type.;;

Как это исправить?.

1 Ответ

0 голосов
/ 04 декабря 2018

Это работает, если приведенный ниже код заменен с

val df2 = df.groupBy("id").agg(collect_list(struct('dt,'amt)).as("trans_vec"))

на

val df2 = df.groupBy("id").agg(collect_list(struct('dt.as("_1"),'amt.as("_2")).as("trans_vec"))

Похоже, UDF просто присваивает _1, _2 .. и так далее для массива структурыперешел на это.Позже вы можете переименовать их, используя select ()

...