Я использовал фрейм данных spark scala для получения желаемого результата.
val df1 = sc.parallelize(Seq(("a","ex1"),("b","ex4"),("c","ex2"),("d","ex6"),("e","ex3"))).toDF("df1_col1","df1_col2")
val df2 = sc.parallelize(Seq((1,("a,b,c")),(2,("d,c,e")),(3,("a,e,c")))).toDF("df2_col1","df2_col2")
df2.withColumn("_tmp", explode(split($"df2_col2", "\\,"))).as("temp").join (df1,$"temp._tmp"===df1("df1_col1"),"inner").drop("_tmp","df2_col2").show
Желание вывода
+--------+--------+--------+
|df2_col1|df1_col1|df1_col2|
+--------+--------+--------+
| 2| e| ex3|
| 3| e| ex3|
| 2| d| ex6|
| 1| c| ex2|
| 2| c| ex2|
| 3| c| ex2|
| 1| b| ex4|
| 1| a| ex1|
| 3| a| ex1|
+--------+--------+--------+
Переименуйте столбец в соответствии с вашими требованиями.
Вот скриншот работающего кода
Happy Hadoooooooooooooooppppppppppppppppppp