Попробуйте использовать when-otherwise
для достижения желаемого результата
scala> import org.apache.spark.sql.expressions.Window
scala> var df =Seq(("a", "TIN" ), ("a", "TIN" ), ("a", "SSN" ), ("b", "TIN" ), ("b", "TIN" ), ("b", "TIN" ), ("c", "SSN" ), ("c", "SSN" ), ("c","null")).toDF("cust","tax_type")
scala> df.withColumn("valid",when(size(collect_set(col("tax_type")).over(Window.partitionBy(col("cust")).orderBy(col("cust"))))>1,"N").otherwise("Y")).orderBy("cust").show()
+----+--------+-----+
|cust|tax_type|valid|
+----+--------+-----+
| a| TIN| N|
| a| SSN| N|
| a| TIN| N|
| b| TIN| Y|
| b| TIN| Y|
| b| TIN| Y|
| c| SSN| N|
| c| SSN| N|
| c| null| N|
+----+--------+-----+
Это позволит избежать всех расчетов min / max для каждой строки.