Spark Scala Window продлит результат до конца - PullRequest
1 голос
/ 26 сентября 2019

Я выставлю свою проблему на основе исходного кадра данных и того, которого я хочу достичь:

val df_997 = Seq [(Int, Int, Int, Int)]((1,1,7,10),(1,10,4,300),(1,3,14,50),(1,20,24,70),(1,30,12,90),(2,10,4,900),(2,25,30,40),(2,15,21,60),(2,5,10,80)).toDF("policyId","FECMVTO","aux","IND_DEF").orderBy(asc("policyId"), asc("FECMVTO"))
df_997.show
+--------+-------+---+-------+
|policyId|FECMVTO|aux|IND_DEF|
+--------+-------+---+-------+
|       1|      1|  7|     10|
|       1|      3| 14|     50|
|       1|     10|  4|    300|
|       1|     20| 24|     70|
|       1|     30| 12|     90|
|       2|      5| 10|     80|
|       2|     10|  4|    900|
|       2|     15| 21|     60|
|       2|     25| 30|     40|
+--------+-------+---+-------+

Представьте, что я разделил этот DF по столбцу policyId и создал столбец row_num на основечтобы лучше увидеть Windows:

val win = Window.partitionBy("policyId").orderBy("FECMVTO")

val df_998 = df_997.withColumn("row_num",row_number().over(win))
df_998.show
+--------+-------+---+-------+-------+
|policyId|FECMVTO|aux|IND_DEF|row_num|
+--------+-------+---+-------+-------+
|       1|      1|  7|     10|      1|
|       1|      3| 14|     50|      2|
|       1|     10|  4|    300|      3|
|       1|     20| 24|     70|      4|
|       1|     30| 12|     90|      5|
|       2|      5| 10|     80|      1|
|       2|     10|  4|    900|      2|
|       2|     15| 21|     60|      3|
|       2|     25| 30|     40|      4|
+--------+-------+---+-------+-------+

Теперь для каждого окна, если значение aux равно 4, я хочу установить значение столбца IND_DEF для этого регистра в столбец FEC_MVTO дляэтот регистр включается до конца окна.

В результате DF будет:

+--------+-------+---+-------+-------+
|policyId|FECMVTO|aux|IND_DEF|row_num|
+--------+-------+---+-------+-------+
|       1|      1|  7|     10|      1|
|       1|      3| 14|     50|      2|
|       1|    300|  4|    300|      3|
|       1|    300| 24|     70|      4|
|       1|    300| 12|     90|      5|
|       2|      5| 10|     80|      1|
|       2|    900|  4|    900|      2|
|       2|    900| 21|     60|      3|
|       2|    900| 30|     40|      4|
+--------+-------+---+-------+-------+

Спасибо за ваши предложения, так как я очень застрял здесь ...

Ответы [ 2 ]

0 голосов
/ 27 сентября 2019
#I believe below can be solution for your issue

Considering input_df is your input dataframe

//Step#1 - Filter rows with IND_DEF = 4 from input_df
val only_FECMVTO_4_df1 = input_df.filter($"IND_DEF" === 4)
//Step#2 - Filling FECMVTO value from  IND_DEF for the above result
val only_FECMVTO_4_df2 = only_FECMVTO_4_df1.withColumn("FECMVTO_NEW",$"IND_DEF").drop($"FECMVTO").withColumnRenamed("FECMVTO",$"FECMVTO_NEW")
//Step#3 - removing all the records from step#1 from input_df
val input_df_without_FECMVTO_4 = input_df.except(only_FECMVTO_4_df1)
//combining Step#2 output with output of Step#3
val final_df = input_df_without_FECMVTO_4.union(only_FECMVTO_4_df2)
0 голосов
/ 27 сентября 2019

Вот один из подходов: сначала left - присоедините DataFrame с его aux == 4 отфильтрованной версией, затем примените оконную функцию first для обратной засыпки null s с требуемыми IND_DEF значениями на раздел и, наконец, условновоссоздать столбец FECMVTO:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1,1,7,10), (1,10,4,300), (1,3,14,50), (1,20,24,70), (1,30,12,90), 
  (2,10,4,900), (2,25,30,40), (2,15,21,60), (2,5,10,80)
).toDF("policyId","FECMVTO","aux","IND_DEF")

val win = Window.partitionBy("policyId").orderBy("FECMVTO").
  rowsBetween(Window.unboundedPreceding, 0)

val df2 = df.
  select($"policyId", $"aux", $"IND_DEF".as("IND_DEF2")).
  where($"aux" === 4)

df.join(df2, Seq("policyId", "aux"), "left_outer").
  withColumn("IND_DEF3", first($"IND_DEF2", ignoreNulls=true).over(win)).
  withColumn("FECMVTO", coalesce($"IND_DEF3", $"FECMVTO")).
  show
// +--------+---+-------+-------+--------+--------+
// |policyId|aux|FECMVTO|IND_DEF|IND_DEF2|IND_DEF3|
// +--------+---+-------+-------+--------+--------+
// |       1|  7|      1|     10|    null|    null|
// |       1| 14|      3|     50|    null|    null|
// |       1|  4|    300|    300|     300|     300|
// |       1| 24|    300|     70|    null|     300|
// |       1| 12|    300|     90|    null|     300|
// |       2| 10|      5|     80|    null|    null|
// |       2|  4|    900|    900|     900|     900|
// |       2| 21|    900|     60|    null|     900|
// |       2| 30|    900|     40|    null|     900|
// +--------+---+-------+-------+--------+--------+

Столбцы IND_DEF2, IND_DEF3 сохраняются только для иллюстрации (и, безусловно, могут быть отброшены).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...