Исходя из того, что я понял, вы пытаетесь найти, содержит ли col2 какой-либо элемент из dArray, если да, то получить первый элемент, отсортированный из col1, в список.
val dArr = Array("x","y","z")
val df = Seq((Array("s","x","w"),Array("a","x")),(Array("v","d","e"),Array("b","z")),(Array("a","b","c"),Array("f","d"))).toDF("col1","col2")
def checkValues(arr: Array[String]) = udf((col1: Seq[String], col2: Seq[String]) => { if (col2.exists(arr.contains)) col1.toList.sortWith(_ < _).head else ""})
df.withColumn("col1_first", checkValues(dArr)($"col1",$"col2")).select($"col1_first").collectAsList
Здесь я передаю col1 & col2 в UDF, где он сравнивает col2 с arr и получает первый элемент col1 после сортировки. Затем вы можете отфильтровать непустое значение и collectAsList.
Дайте мне знать, если это работает, в противном случае предоставьте больше информации, включая ожидаемые образцы данных.
Я изменил, но взорвать кажется дорогостоящей операцией. Посмотрите, поможет ли это с вашим набором данных,
val dArr = Array("x","y","w")
val df = Seq((Array("a","b"),Array("w","x")),(Array("a","d"),Array("x","y")),(Array("c","d","f"),Array("x","y")),(Array("h"),Array("w"))).toDF("col1","col2")
def checkValues(arr: Array[String]) = udf((col1: Seq[String], col2: Seq[String]) => { arr.foldLeft(Array[String]()){case(acc, elem) => {if (col2.contains(elem)) acc :+ elem else acc}}})
val df_1 = df.withColumn("arrValues", explode(checkValues(dArr)($"col1",$"col2")))
df_1.select($"arrValues",explode($"col1").as("col1")).groupBy($"arrValues").agg(collect_set($"col1").as("col1_list")).show