вместо использования соединений и где я бы предложил одиночное соединение с опорой. вот фрагмент кода посмотреть.
>>> import pyspark.sql.functions as F
>>> df1= spark.createDataFrame([[12345,"10/01/2018"],[75852,"10/01/2018"],[33995,"10/01/2018"]],["product_id","eff_date"])
>>> df1.show()
+----------+----------+
|product_id| eff_date|
+----------+----------+
| 12345|10/01/2018|
| 75852|10/01/2018|
| 33995|10/01/2018|
+----------+----------+
>>> df2 = spark.createDataFrame([[12345,"KA","HOS"],[12345,"HM","AMB"],[75852,"GB","HOS"],[33995,"HD","AMB"]],["product_id","Segment","type"])
>>> df2.show()
+----------+-------+----+
|product_id|Segment|type|
+----------+-------+----+
| 12345| KA| HOS|
| 12345| HM| AMB|
| 75852| GB| HOS|
| 33995| HD| AMB|
+----------+-------+----+
>>> df1.join(df2,df1.product_id ==df2.product_id,"inner").groupBy(df2.product_id,df1.eff_date).pivot("type").agg(F.first(df2.Segment)).show()
+----------+----------+----+----+
|product_id| eff_date| AMB| HOS|
+----------+----------+----+----+
| 12345|10/01/2018| HM| KA|
| 33995|10/01/2018| HD|null|
| 75852|10/01/2018|null| GB|
+----------+----------+----+----+
Spark-SQL 2.4 +
>>> df1.registerTempTable("df1_temp")
>>> df2.registerTempTable("df2_temp")
>>> spark.sql("select * from(select a.*,b.segment,b.type from df1_temp a inner join df2_temp b on a.product_id =b.product_id) PIVOT( first(segment) for type in ('HOS' HOS_segment,'AMB' AMB_Segment )) " ).show()
+----------+----------+-----------+-----------+
|product_id| eff_date|HOS_segment|AMB_Segment|
+----------+----------+-----------+-----------+
| 12345|10/01/2018| KA| HM|
| 33995|10/01/2018| null| HD|
| 75852|10/01/2018| GB| null|
+----------+----------+-----------+-----------+
Я надеюсь, что этопоможет вам. дайте мне знать, если у вас есть какие-либо вопросы, связанные с тем же.