Проверьте это.Оптимизация приветствуется!
scala> val df = Seq(
| ("a",20,0 ),
| ("a",21,1 ),
| ("a",22,0 ),
| ("a",23,1 ),
| ("a",24,0 ),
| ("a",25,1 ),
| ("b",20,0 ),
| ("b",21,0 ),
| ("b",22,1 ),
| ("b",23,1 ),
| ("b",24,0 ),
| ("b",25,1 )
| ).toDF("col1","col2","col3")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
scala> val df2 = df.withColumn("col3list", collect_list("col3")over(Window.partitionBy($"col1").orderBy($"col2").rangeBetween(-5,-1))).withColumn("col2list",collect_list("col2")over(Window.partitionBy($"col1").orderBy($"col2").rangeBetween(-5,-1)))
df2: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 3 more fields]
scala> def col3_max_min(x:String,y:String):Map[Int,Int]=
| {
| if(x.contains(","))
| {
| val x1 = x.split(",").map(_.trim).map(_.toInt)
| val y1 = y.split(",").map(_.trim).map(_.toInt)
| val p = y1.zip(x1).filter(_._2 > 0 ).toMap
| if ( p.isEmpty ) Map(0->0) else p
| }
| else
| return Map(0->0)
| }
col3_max_min: (x: String, y: String)Map[Int,Int]
scala> val myudfcol3 = udf( col3_max_min(_:String,_:String):Map[Int,Int] )
myudfcol3: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(IntegerType,IntegerType,false),Some(List(StringType, StringType)))
scala> val df3 = df2.withColumn("max_min",myudfcol3( concat_ws(",",col("col3list")), concat_ws(",",col("col2list"))))
df3: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 4 more fields]
scala> val df4 = df3.withColumn("maxx",sort_array(map_keys(col("max_min")))(size(col("max_min"))-1)).withColumn("minn",sort_array(map_keys(col("max_min")))(0))
df4: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 6 more fields]
scala> df4.select('col1,'col2,'col3,'minn,'maxx).show(false)
+----+----+----+----+----+
|col1|col2|col3|minn|maxx|
+----+----+----+----+----+
|b |20 |0 |0 |0 |
|b |21 |0 |0 |0 |
|b |22 |1 |0 |0 |
|b |23 |1 |22 |22 |
|b |24 |0 |22 |23 |
|b |25 |1 |22 |23 |
|a |20 |0 |0 |0 |
|a |21 |1 |0 |0 |
|a |22 |0 |21 |21 |
|a |23 |1 |21 |21 |
|a |24 |0 |21 |23 |
|a |25 |1 |21 |23 |
+----+----+----+----+----+
scala>