Рассчитать минимальное и максимальное значение из набора значений после применения условия к столбцу - PullRequest
0 голосов
/ 08 октября 2018

У меня есть датафрейм (df), который выглядит следующим образом

col1 col2 col3
a     20    0    
a     21    1    
a     22    0    
a     23    1     
a     24    0    
a     25    1    
b     20    0    
b     21    0    
b     22    1    
b     23    1     
b     24    0    
b     25    1    

Я хочу вычислить минимальное и максимальное значение col2 в последних 5 строках, где col3 = 1, для каждой строки в кадре данных,Итак, чтобы вычислить мой minimin / максимум, строка должна иметь col3 = 1.

Желаемый вывод

col1 col2 col3  minLast5 maxLast5
a     20    0    0         0
a     21    1    0         0
a     22    0    21        21
a     23    1    21        21
a     24    0    21        23
a     25    1    21        23
b     20    0    0         0
b     21    0    0         0
b     22    1    0         0
b     23    1    22        22
b     24    0    22        23
b     25    1    22        23

Я пробовал следующий код

df
.withColumn("minLast5", when($"col3">0, min("col2").over(Window
                                                    .partitionBy($"col1")
                                                    .orderBy($"col2")
                                                    .rangeBetween(-5,-1))
                   .otherwise(0))
.withColumn("maxLast5", when($"col3">0, max("col2").over(Window
                                                    .partitionBy($"col1")
                                                    .orderBy($"col2")
                                                    .rangeBetween(-5,-1))
                   .otherwise(0))

код выше дает неправильный вывод.Это дает мне минимальное / максимальное значение col 2 для строк, где col3 равно 1.

Есть идеи, как мне это решить?

Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 08 октября 2018

Проверьте это.Оптимизация приветствуется!

scala> val df = Seq(
     | ("a",20,0    ),
     | ("a",21,1    ),
     | ("a",22,0    ),
     | ("a",23,1     ),
     | ("a",24,0    ),
     | ("a",25,1    ),
     | ("b",20,0    ),
     | ("b",21,0    ),
     | ("b",22,1    ),
     | ("b",23,1     ),
     | ("b",24,0    ),
     | ("b",25,1  )
     | ).toDF("col1","col2","col3")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]

scala> val df2 =  df.withColumn("col3list", collect_list("col3")over(Window.partitionBy($"col1").orderBy($"col2").rangeBetween(-5,-1))).withColumn("col2list",collect_list("col2")over(Window.partitionBy($"col1").orderBy($"col2").rangeBetween(-5,-1)))
df2: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 3 more fields]

scala> def col3_max_min(x:String,y:String):Map[Int,Int]=
     | {
     | if(x.contains(","))
     | {
     |   val x1 = x.split(",").map(_.trim).map(_.toInt)
     |   val y1 = y.split(",").map(_.trim).map(_.toInt)
     |   val p = y1.zip(x1).filter(_._2 > 0 ).toMap
     |    if ( p.isEmpty )  Map(0->0) else p
     | }
     | else
     | return Map(0->0)
     | }
col3_max_min: (x: String, y: String)Map[Int,Int]

scala> val myudfcol3 = udf( col3_max_min(_:String,_:String):Map[Int,Int] )
myudfcol3: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(IntegerType,IntegerType,false),Some(List(StringType, StringType)))

scala> val df3 = df2.withColumn("max_min",myudfcol3( concat_ws(",",col("col3list")), concat_ws(",",col("col2list"))))
df3: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 4 more fields]

scala> val df4 = df3.withColumn("maxx",sort_array(map_keys(col("max_min")))(size(col("max_min"))-1)).withColumn("minn",sort_array(map_keys(col("max_min")))(0))
df4: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 6 more fields]

scala>  df4.select('col1,'col2,'col3,'minn,'maxx).show(false)
+----+----+----+----+----+
|col1|col2|col3|minn|maxx|
+----+----+----+----+----+
|b   |20  |0   |0   |0   |
|b   |21  |0   |0   |0   |
|b   |22  |1   |0   |0   |
|b   |23  |1   |22  |22  |
|b   |24  |0   |22  |23  |
|b   |25  |1   |22  |23  |
|a   |20  |0   |0   |0   |
|a   |21  |1   |0   |0   |
|a   |22  |0   |21  |21  |
|a   |23  |1   |21  |21  |
|a   |24  |0   |21  |23  |
|a   |25  |1   |21  |23  |
+----+----+----+----+----+


scala>
0 голосов
/ 08 октября 2018

Условие «когда» может быть включено в функции min / max:

val df = List(
  ("a", 20, 0),
  ("a", 21, 1),
  ("a", 22, 0),
  ("a", 23, 1),
  ("a", 24, 0),
  ("a", 25, 1),
  ("b", 20, 0),
  ("b", 21, 0),
  ("b", 22, 1),
  ("b", 23, 1),
  ("b", 24, 0),
  ("b", 25, 1)
).toDF("col1", "col2", "col3")

val window = Window.partitionBy($"col1")
  .orderBy($"col2")
  .rangeBetween(-5, -1)

val result = df
  .withColumn("minLast5", min(when($"col3" === 1, $"col2").otherwise(lit(null))).over(window))
  .withColumn("maxLast5", max(when($"col3" === 1, $"col2").otherwise(lit(null))).over(window))
  // replace null with 0
  .withColumn("minLast5", when($"minLast5".isNull, 0).otherwise($"minLast5"))
  .withColumn("maxLast5", when($"maxLast5".isNull, 0).otherwise($"maxLast5"))

result.show(false)

Выход:

+----+----+----+--------+--------+
|col1|col2|col3|minLast5|maxLast5|
+----+----+----+--------+--------+
|a   |20  |0   |0       |0       |
|a   |21  |1   |0       |0       |
|a   |22  |0   |21      |21      |
|a   |23  |1   |21      |21      |
|a   |24  |0   |21      |23      |
|a   |25  |1   |21      |23      |
|b   |20  |0   |0       |0       |
|b   |21  |0   |0       |0       |
|b   |22  |1   |0       |0       |
|b   |23  |1   |22      |22      |
|b   |24  |0   |22      |23      |
|b   |25  |1   |22      |23      |
+----+----+----+--------+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...