перевести с вызовов функции spark на SQL - PullRequest
0 голосов
/ 18 марта 2020

У меня есть набор данных со схемой ниже.

root
 |-- acct_id: long (nullable = true)
 |-- firm_bnkg_id: integer (nullable = true)
 |-- tagged: long (nullable = true)
 |-- transactions: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mo_yr_buckt: string (nullable = false)
 |    |    |-- acct_id: long (nullable = false)
 |    |    |-- eff_dt: date (nullable = true)
 |    |    |-- extn_txn_cd: string (nullable = true)
 |    |    |-- mntr_txn_am: double (nullable = true)
 |    |    |-- cr_dr_in: string (nullable = true)
 |    |    |-- txn_desc_tx: string (nullable = true)
 |    |    |-- txn_auth_dt: date (nullable = false)
 |    |    |-- txn_auth_ts: string (nullable = false)
 |    |    |-- tagged: long (nullable = true)
 |    |    |-- firm_bnkg_id: integer (nullable = false)
 |    |    |-- txn_pst_sq_nb: string (nullable = false)
 |    |    |-- pst_dt: integer (nullable = false)
 |-- prty_ol_prfl_id: long (nullable = true)
 |-- prod_cd: string (nullable = true)
 |-- acct_type_cd: string (nullable = true)
 |-- acct_state_cd: string (nullable = true)

Теперь я хочу изменить текущий код на оператор SQL. Текущий код выглядит следующим образом:

val result = ds.select(col("*"), explode(col("transactions")).as("txn"))
  .where("IsValidUDF(txn) = TRUE").groupBy("prty_ol_prfl_id")
  .agg(collect_list("txn").as("transactions"))

, который создает следующую схему:

root
 |-- acct_id: long (nullable = true)
 |-- firm_bnkg_id: integer (nullable = true)
 |-- tagged: long (nullable = true)
 |-- transactions: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mo_yr_buckt: string (nullable = false)
 |    |    |-- acct_id: long (nullable = false)
 |    |    |-- eff_dt: date (nullable = true)
 |    |    |-- extn_txn_cd: string (nullable = true)
 |    |    |-- mntr_txn_am: double (nullable = true)
 |    |    |-- cr_dr_in: string (nullable = true)
 |    |    |-- txn_desc_tx: string (nullable = true)
 |    |    |-- txn_auth_dt: date (nullable = false)
 |    |    |-- txn_auth_ts: string (nullable = false)
 |    |    |-- tagged: long (nullable = true)
 |    |    |-- firm_bnkg_id: integer (nullable = false)
 |    |    |-- txn_pst_sq_nb: string (nullable = false)
 |    |    |-- pst_dt: integer (nullable = false)
 |-- prty_ol_prfl_id: long (nullable = true)
 |-- prod_cd: string (nullable = true)
 |-- acct_type_cd: string (nullable = true)
 |-- acct_state_cd: string (nullable = true)

IsValidUDF просто проверяет столбец, помеченный для определенных значений.

Любая помощь будет оценена. Спасибо

1 Ответ

0 голосов
/ 18 марта 2020

Перевод вашего кода в искру sql выражение :

val new_df = spark.sql("""
    WITH temp AS(
        SELECT *, explode(transactions) AS txn FROM df 
    )
    SELECT first(id) id, collect_list(txn) AS TRANSACTIONS FROM temp WHERE IsValidUDF(txn) = TRUE GROUP BY id 
""")

(просто замените first(id) на first(.) на каждый столбец, который вы хотите иметь в результирующий кадр данных.

Заранее убедитесь, что ваш udf зарегистрирован :

spark.udf.register("IsValidUDF", is_valid_udf)

Вот полный код с игрушечным примером :

import org.apache.spark.sql.Row

// Toy example
val df = Seq((0, List(66,1) ),(1, List(98, 2)),(2, List(90))).toDF("id", "transactions")
df.createOrReplaceTempView("df")
val is_valid_udf = udf((r: Int) => r > 50)

// register udf
spark.udf.register("IsValidUDF", is_valid_udf)
// query
val new_df = spark.sql("""
    WITH temp AS(
        SELECT *, explode(transactions) AS txn FROM df 
    )
    SELECT first(id) id, collect_list(txn) AS TRANSACTIONS FROM temp WHERE IsValidUDF(txn) = TRUE GROUP BY id 
""")

Вывод:

+---+------------+
| id|TRANSACTIONS|
+---+------------+
|  1|        [98]|
|  2|        [90]|
|  0|        [66]|
+---+------------+

, который является исходным кадром данных с удаленными транзакциями> 50.

...