Перевод вашего кода в искру sql выражение :
val new_df = spark.sql("""
WITH temp AS(
SELECT *, explode(transactions) AS txn FROM df
)
SELECT first(id) id, collect_list(txn) AS TRANSACTIONS FROM temp WHERE IsValidUDF(txn) = TRUE GROUP BY id
""")
(просто замените first(id)
на first(.)
на каждый столбец, который вы хотите иметь в результирующий кадр данных.
Заранее убедитесь, что ваш udf зарегистрирован :
spark.udf.register("IsValidUDF", is_valid_udf)
Вот полный код с игрушечным примером :
import org.apache.spark.sql.Row
// Toy example
val df = Seq((0, List(66,1) ),(1, List(98, 2)),(2, List(90))).toDF("id", "transactions")
df.createOrReplaceTempView("df")
val is_valid_udf = udf((r: Int) => r > 50)
// register udf
spark.udf.register("IsValidUDF", is_valid_udf)
// query
val new_df = spark.sql("""
WITH temp AS(
SELECT *, explode(transactions) AS txn FROM df
)
SELECT first(id) id, collect_list(txn) AS TRANSACTIONS FROM temp WHERE IsValidUDF(txn) = TRUE GROUP BY id
""")
Вывод:
+---+------------+
| id|TRANSACTIONS|
+---+------------+
| 1| [98]|
| 2| [90]|
| 0| [66]|
+---+------------+
, который является исходным кадром данных с удаленными транзакциями> 50.