Проверьте это:
scala> val df3 = df1.alias("t1").join(df2.alias("t2"),$"product_no" === $"p_no" && $"dist" === $"vendor").withColumn("match", when($"t1.code"===$"t2.code",lit(1)).when(regexp_extract($"t1.code",".*j",0)=!=lit("") && regexp_extract($"t2.code",".*j",0)=!=lit(""), 2).when(regexp_extract($"t1.code",".*[^j]$",0)=!=lit("") && regexp_extract($"t2.code","[*][*]",0)=!=lit(""), 3).otherwise(lit(0))).filter('match > 0).toDF("product_no","dist","code1","p_no","vendor","code2","product","match")
df3: org.apache.spark.sql.DataFrame = [product_no: string, dist: string ... 6 more fields]
scala> val df4= df3.withColumn("match2", collect_set('code2) over(Window.partitionBy('product_no,'dist).orderBy('match)))
df4: org.apache.spark.sql.DataFrame = [product_no: string, dist: string ... 7 more fields]
scala> df4.show
+----------+----+-----+----+------+-----+-------+-----+------------+
|product_no|dist|code1|p_no|vendor|code2|product|match| match2|
+----------+----+-----+----+------+-----+-------+-----+------------+
| 040| wmn| mn| 040| wmn| mn| n| 1| [mn]|
| 040| wmn| aj| 040| wmn| *j| y| 2| [*j, mn]|
| 040| wmn| mn| 040| wmn| **| y| 3|[*j, mn, **]|
| 040| wmn| lm| 040| wmn| **| y| 3|[*j, mn, **]|
+----------+----+-----+----+------+-----+-------+-----+------------+
scala> df4.selectExpr("*"," match in (1,2) or ( not array_contains(match2,code1) ) as match3 ").where('match3).show
+----------+----+-----+----+------+-----+-------+-----+------------+------+
|product_no|dist|code1|p_no|vendor|code2|product|match| match2|match3|
+----------+----+-----+----+------+-----+-------+-----+------------+------+
| 040| wmn| mn| 040| wmn| mn| n| 1| [mn]| true|
| 040| wmn| aj| 040| wmn| *j| y| 2| [*j, mn]| true|
| 040| wmn| lm| 040| wmn| **| y| 3|[*j, mn, **]| true|
+----------+----+-----+----+------+-----+-------+-----+------------+------+
scala>