Сначала вы должны использовать UDF для обработки диапазона:
val df = Seq(("A","1","01","A","[50,90]","55","1")).toDF("city","day","month","tag","range","value","rank")
+----+---+-----+---+-------+-----+----+
|city|day|month|tag| range|value|rank|
+----+---+-----+---+-------+-----+----+
| A| 1| 01| A|[50,90]| 55| 1|
+----+---+-----+---+-------+-----+----+
def checkRange(range : String,rank : String, value : String) : String = {
val rangeProcess = range.dropRight(1).drop(1).split(",")
if (rank.toInt > 5){
"negative"
} else {
if (value > rangeProcess(0) && value < rangeProcess(1)){
"positive"
} else {
"negative"
}
}
}
val checkRangeUdf = udf(checkRange _)
df.withColumn("Result",checkRangeUdf(col("range"),col("rank"),col("value"))).show()
+----+---+-----+---+-------+-----+----+--------+
|city|day|month|tag| range|value|rank| Result|
+----+---+-----+---+-------+-----+----+--------+
| A| 1| 01| A|[50,90]| 55| 1|positive|
+----+---+-----+---+-------+-----+----+--------+
val result = df.withColumn("Result",checkRangeUdf(col("range"),col("rank"),col("value"))).groupBy("city","Result").count.show
+----+--------+-----+
|city| Result|count|
+----+--------+-----+
| A|positive| 1|
+----+--------+-----+