Кажется, что происходит изменение поведения между 1.6 и 2.3 при кэшировании Dataframe и сохранении его как временной таблицы.В 1.6 следующий код выполняется printUDF
один раз.Эквивалентный код в 2.3 (или даже такой же, как есть) выполняет его дважды.
val rdd = context.parallelize(Seq(1, 2, 3)).map(Row(_))
val schema = StructType(StructField("num", IntegerType) :: Nil)
val df1 = sql.createDataFrame(rdd, schema)
df1.registerTempTable("data_table")
sql.udf.register("printUDF", (x:Int) => {print(x)
x
})
val df2 = sql.sql("select printUDF(num) result from data_table").cache()
df2.collect() //execute cache
df2.registerTempTable("cached_table")
val df3 = sql.sql("select result from cached_table")
df3.collect()
1.6 печатает 123, а 2.3 печатает 123123, таким образом оценивая кадр данных дважды.
Удалось преодолеть, пропустиввременная таблица и выбор непосредственно из кэшированного фрейма данных, но было интересно, если это ожидаемое поведение / известная проблема.
ОБНОВЛЕНИЕ: Согласно https://issues.apache.org/jira/browse/SPARK-26510, выглядит как ошибка.