Я написал функцию scala для объединения двух фреймов данных с одной и той же схемой, говорит df1 и df2. Для каждого ключа в df1, если ключ df1 совпадает с df2, мы выбираем значения из df2 для этого ключа, если нет, то оставляем значение df1. Он должен возвращать фрейм данных с тем же номером df1, но другим значением, но функция не работает, и возвращает тот же df, что и df1.
def joinDFwithConditions(df1: DataFrame, df2: DataFrame, key_seq: Seq[String]) ={
var final_df = df1.as("a").join(df2.as("b"), key_seq, "left_outer")
//set of non-key columns
val col_str = df1.columns.toSet -- key_seq.toSet
for (c <- col_str){ //for every match-record, check values from both dataframes
final_df = final_df
.withColumn(s"$c",
when(col(s"b.$c").isNull || col(s"b.$c").isNaN,col(s"a.$c"))
.otherwise(col(s"b.$c")))
// I used to re-assign value with reference "t.$c",
// but return error says no t.col found in schema
}
final_df.show()
final_df.select(df1.columns.map(x => df1(x)):_*)
}
def main(args: Array[String]) {
val sparkSession = SparkSession.builder().appName(this.getClass.getName)
.config("spark.hadoop.validateOutputSpecs", "false")
.enableHiveSupport()
.getOrCreate()
import sparkSession.implicits._
val df1 = List(("key1",1),("key2",2),("key3",3)).toDF("x","y")
val df2 = List(("key1",9),("key2",8)).toDF("x","y")
joinDFwithConditions(df1, df2, Seq("x")).show()
sparkSession.stop()
}
образец df1
+--------------++--------------------+
|x ||y |
+--------------++--------------------+
| key1 ||1 |
| key2 ||2 |
| key3 ||3 |
--------------------------------------
образец df2
+--------------++--------------------+
|x ||y |
+--------------++--------------------+
| key1 ||9 |
| key2 ||8 |
--------------------------------------
ожидаемые результаты:
+--------------++--------------------+
|x ||y |
+--------------++--------------------+
| key1 ||9 |
| key2 ||8 |
| key3 ||3 |
--------------------------------------
что действительно показывает:
+-------+---+---+
| x | y| y|
+-------+---+---+
| key1 | 9| 9|
| key2 | 8| 8|
| key3 | 3| 3|
+-------+---+---+
сообщение об ошибке
ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Resolved attribute(s) y#6 missing from x#5,y#21,y#22 in operator !Project [x#5, y#6]. Attribute(s) with the same name appear in the operation: y. Please check if the right attribute(s) are used.;;
!Project [x#5, y#6]
+- Project [x#5, CASE WHEN (isnull(y#15) || isnan(cast(y#15 as double))) THEN y#6 ELSE y#15 END AS y#21, CASE WHEN (isnull(y#15) || isnan(cast(y#15 as double))) THEN y#6 ELSE y#15 END AS y#22]
+- Project [x#5, y#6, y#15]
+- Join LeftOuter, (x#5 = x#14)
:- SubqueryAlias `a`
: +- Project [_1#2 AS x#5, _2#3 AS y#6]
: +- LocalRelation [_1#2, _2#3]
+- SubqueryAlias `b`
+- Project [_1#11 AS x#14, _2#12 AS y#15]
+- LocalRelation [_1#11, _2#12]