Лучшим вариантом является уменьшение размера данных перед присоединением (мы не можем уничтожить присоединение). Мы можем уменьшить, как показано ниже:
Во-первых, загрузка данных
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> df1.show
+---+---+-----+
| c1| c2| c3|
+---+---+-----+
| a| b|abcde|
| c| d| fd|
+---+---+-----+
scala> df2.show
+---+---+----+---+
| c1| c2| c3| c4|
+---+---+----+---+
| a| b| a| 90|
| a| b| abd|100|
| a| b|abcd|150|
| c| d|wewe| 79|
+---+---+----+---+
Теперь нам нужно уменьшить размер df2 перед присоединением (это уменьшит время, необходимое для объединения, так как размер данныхменьше сравнивать) с помощью оконной функции и определения максимального значения обоих столбцов
scala> df2.withColumn("len", length($"c3")).withColumn("res", row_number().over(wind1)).filter($"res" === 1).withColumn("res2", row_number().over(wind2)).filter($"res2"=== 1).select("c1", "c2", "c3", "c4").show()
+---+---+----+---+
| c1| c2| c3| c4|
+---+---+----+---+
| c| d|wewe| 79|
| a| b|abcd|150|
+---+---+----+---+
вещей, которые можно попробовать:
1> Вы можете присоединиться к этим сокращенным фреймам данных и применить используемую логику
2> Попробуйте выполнить объединение df1.withColumn("c4", lit(0)).union(df2)
, а затем примените приведенную выше логику.
Надеюсь, это поможет