Установить буквенное значение над окном, если условие подходит Spark Scala - PullRequest
1 голос
/ 25 сентября 2019

Мне нужно проверить условие в окне: - Если столбец IND_DEF равен 20, я хочу изменить значение премии столбца для окна, к которому принадлежит этот регистр, и установить его равным 1.

Мой начальный Dataframe выглядит следующим образом:

+--------+----+-------+-----+-------+
|policyId|name|premium|state|IND_DEF|
+--------+----+-------+-----+-------+
|       1|  BK|   null|   KT|     40|
|       1|  AK|    -31| null|     30|
|       1|  VZ|   null|   IL|     20|
|       2|  VK|     32|   LI|      7|
|       2|  CK|     25|  YNZ|     10|
|       2|  CK|      0| null|      5|
|       2|  VK|     30|   IL|     25|
+--------+----+-------+-----+-------+

И я хочу добиться этого:

+--------+----+-------+-----+-------+
|policyId|name|premium|state|IND_DEF|
+--------+----+-------+-----+-------+
|       1|  BK|      1|   KT|     40|
|       1|  AK|      1| null|     30|
|       1|  VZ|      1|   IL|     20|
|       2|  VK|     32|   LI|      7|
|       2|  CK|     25|  YNZ|     10|
|       2|  CK|      0| null|      5|
|       2|  VK|     30|   IL|     25|
+--------+----+-------+-----+-------+

Я пытаюсь следующий код, но не работает ...

val df_946 = Seq [(Int, String, Integer, String, Int)]((1,"VZ",null,"IL",20),(1, "AK", -31,null,30),(1,"BK", null,"KT",40),(2,"CK",0,null,5),(2,"CK",25,"YNZ",10),(2,"VK",30,"IL",25),(2,"VK",32,"LI",7)).toDF("policyId", "name", "premium", "state","IND_DEF").orderBy("policyId")

val winSpec = Window.partitionBy("policyId").orderBy("policyId")

val df_947 = df_946.withColumn("premium",when(col("IND_DEF") === 20,lit(1).over(winSpec)).otherwise(col("premium")))

1 Ответ

1 голос
/ 25 сентября 2019

Вы можете сгенерировать массив IND_DEF значений через collect_list для каждого раздела окна и воссоздать столбец premium на основе условия array_contains:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1, None, 40),
  (1, Some(-31), 30),
  (1, None, 20),
  (2, Some(32), 7),
  (2, Some(30), 10)
).toDF("policyId", "premium", "IND_DEF")

val win = Window.partitionBy($"policyId")

df.
  withColumn("indList", collect_list($"IND_DEF").over(win)).
  withColumn("premium", when(array_contains($"indList", 20), 1).otherwise($"premium")).
  drop($"indList").
  show
// +--------+-------+-------+
// |policyId|premium|IND_DEF|
// +--------+-------+-------+
// |       1|      1|     40|
// |       1|      1|     30|
// |       1|      1|     20|
// |       2|     32|      7|
// |       2|     30|     10|
// +--------+-------+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...