.rowsBetween (Window.unboundedPreceding, Window.unboundedFollowing) Ошибка Spark Scala - PullRequest
0 голосов
/ 28 сентября 2019

Здравствуйте. Я пытаюсь расширить последнее значение каждого окна до остальной части окна для столбца count, чтобы создать флаг, который распознает, является ли регистр последним значением окна.Я попробовал это так, но не получилось.

Пример DF:

val df_197 = Seq [(Int, Int, Int, Int)]((1,1,7,10),(1,10,4,300),(1,3,14,50),(1,20,24,70),(1,30,12,90),(2,10,4,900),(2,25,30,40),(2,15,21,60),(2,5,10,80)).toDF("policyId","FECMVTO","aux","IND_DEF").orderBy(asc("policyId"), asc("FECMVTO"))
df_197.show
+--------+-------+---+-------+
|policyId|FECMVTO|aux|IND_DEF|
+--------+-------+---+-------+
|       1|      1|  7|     10|
|       1|      3| 14|     50|
|       1|     10|  4|    300|
|       1|     20| 24|     70|
|       1|     30| 12|     90|
|       2|      5| 10|     80|
|       2|     10|  4|    900|
|       2|     15| 21|     60|
|       2|     25| 30|     40|
+--------+-------+---+-------+
val juntar_riesgo = 1
val var_entidad_2 = $"aux"

//Particionar por uno o dos campos en funcion del valor de la variable juntar_riesgo
//Se creará window_number_2 basado en este particionamiento
val winSpec = if(juntar_riesgo == 1) {
  Window.partitionBy($"policyId").orderBy($"FECMVTO")  
} else {
  Window.partitionBy(var_entidad_2,$"policyId").orderBy("FECMVTO")
}

val df_308 = df_307.withColumn("window_number", row_number().over(winSpec))
                   .withColumn("count", last("window_number",true) over (winSpec))
                   .withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show

Результат (количество столбцов мне должно быть 5 для всех элементов в 1-м разделе и 4 для всех элементов во 2-м разделе):

+--------+-------+---+-------+-------------+-----+------------+
|policyId|FECMVTO|aux|IND_DEF|window_number|count|FLG_LAST_WDW|
+--------+-------+---+-------+-------------+-----+------------+
|       1|      1|  7|     10|            1|    1|           1|
|       1|      3| 14|     50|            2|    2|           1|
|       1|     10|  4|    300|            3|    3|           1|
|       1|     20| 24|     70|            4|    4|           1|
|       1|     30| 12|     90|            5|    5|           1|
|       2|      5| 10|     80|            1|    1|           1|
|       2|     10|  4|    900|            2|    2|           1|
|       2|     15| 21|     60|            3|    3|           1|
|       2|     25| 30|     40|            4|    4|           1|
+--------+-------+---+-------+-------------+-----+------------+

Тогда я читаю, когда вы используете orderBy после предложения windowPartition, вы должны указать предложение .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) для достижения того, что мне нужно.Однако, когда я пытаюсь это сделать, я сталкиваюсь с этой ошибкой:

val juntar_riesgo = 1
val var_entidad_2 = $"aux"

//Particionar por uno o dos campos en funcion del valor de la variable juntar_riesgo
//Se creará window_number_2 basado en este particionamiento
val winSpec = if(juntar_riesgo == 1) {
  Window.partitionBy($"policyId").orderBy($"FECMVTO")  
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
} else {
  Window.partitionBy(var_entidad_2,$"policyId").orderBy("FECMVTO")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
}

val df_198 = df_197.withColumn("window_number", row_number().over(winSpec))
                   .withColumn("count", last("window_number",true) over (winSpec))
                   .withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show
ERROR: org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$());

Спасибо за вашу помощь!

1 Ответ

1 голос
/ 28 сентября 2019

Вы не должны использовать last здесь, но max без без указания порядка:

val df_198 = df_197
  .withColumn("window_number", row_number().over(Window.partitionBy($"policyId").orderBy($"FECMVTO")))
  .withColumn("count", max("window_number") over (Window.partitionBy($"policyId")))
  .withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show


+--------+-------+---+-------+-------------+-----+------------+
|policyId|FECMVTO|aux|IND_DEF|window_number|count|FLG_LAST_WDW|
+--------+-------+---+-------+-------------+-----+------------+
|       1|      1|  7|     10|            1|    5|           0|
|       1|      3| 14|     50|            2|    5|           0|
|       1|     10|  4|    300|            3|    5|           0|
|       1|     20| 24|     70|            4|    5|           0|
|       1|     30| 12|     90|            5|    5|           1|
|       2|      5| 10|     80|            1|    4|           0|
|       2|     10|  4|    900|            2|    4|           0|
|       2|     15| 21|     60|            3|    4|           0|
|       2|     25| 30|     40|            4|    4|           1|
+--------+-------+---+-------+-------------+-----+------------+

Обратите внимание, что вы можете написать это короче, рассчитав row_number с нисходящим порядкоми затем возьмите row_number===1:

val df_198 = df_197
  .withColumn("FLG_LAT_WDW", when(row_number().over(Window.partitionBy($"policyId").orderBy($"FECMVTO".desc))===1,1).otherwise(0))
  .show
...