переназначение значения столбцу не работает, но создайте новый столбец, который нельзя выбрать - PullRequest
0 голосов
/ 03 февраля 2020

Я написал функцию scala для объединения двух фреймов данных с одной и той же схемой, говорит df1 и df2. Для каждого ключа в df1, если ключ df1 совпадает с df2, мы выбираем значения из df2 для этого ключа, если нет, то оставляем значение df1. Он должен возвращать фрейм данных с тем же номером df1, но другим значением, но функция не работает, и возвращает тот же df, что и df1.

  def joinDFwithConditions(df1: DataFrame, df2: DataFrame, key_seq: Seq[String]) ={

  var final_df = df1.as("a").join(df2.as("b"), key_seq, "left_outer")
  //set of non-key columns
  val col_str = df1.columns.toSet -- key_seq.toSet
  for (c <- col_str){ //for every match-record, check values from both dataframes
  final_df = final_df
        .withColumn(s"$c", 
            when(col(s"b.$c").isNull || col(s"b.$c").isNaN,col(s"a.$c"))
            .otherwise(col(s"b.$c")))
         // I used to re-assign value with reference "t.$c",
         // but return error says no t.col found in schema
}
  final_df.show()

  final_df.select(df1.columns.map(x => df1(x)):_*)

}


  def main(args: Array[String]) {
  val sparkSession = SparkSession.builder().appName(this.getClass.getName)
  .config("spark.hadoop.validateOutputSpecs", "false")
  .enableHiveSupport()
  .getOrCreate()
  import sparkSession.implicits._

  val df1 = List(("key1",1),("key2",2),("key3",3)).toDF("x","y")

  val df2 = List(("key1",9),("key2",8)).toDF("x","y")

  joinDFwithConditions(df1, df2, Seq("x")).show()

  sparkSession.stop()
}

образец df1

+--------------++--------------------+
|x             ||y                   |     
+--------------++--------------------+
| key1         ||1                   |
| key2         ||2                   |
| key3         ||3                   |
--------------------------------------

образец df2

+--------------++--------------------+
|x             ||y                   |     
+--------------++--------------------+
| key1         ||9                   |
| key2         ||8                   |
--------------------------------------

ожидаемые результаты:

+--------------++--------------------+
|x             ||y                   |     
+--------------++--------------------+
| key1         ||9                   |
| key2         ||8                   |
| key3         ||3                   |
--------------------------------------

что действительно показывает:

+-------+---+---+
|  x    |  y|  y|
+-------+---+---+
|  key1 |  9|  9|
|  key2 |  8|  8|
|  key3 |  3|  3|
+-------+---+---+

сообщение об ошибке


ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Resolved attribute(s) y#6 missing from x#5,y#21,y#22 in operator !Project [x#5, y#6]. Attribute(s) with the same name appear in the operation: y. Please check if the right attribute(s) are used.;;
!Project [x#5, y#6]
+- Project [x#5, CASE WHEN (isnull(y#15) || isnan(cast(y#15 as double))) THEN y#6 ELSE y#15 END AS y#21, CASE WHEN (isnull(y#15) || isnan(cast(y#15 as double))) THEN y#6 ELSE y#15 END AS y#22]
   +- Project [x#5, y#6, y#15]
      +- Join LeftOuter, (x#5 = x#14)
         :- SubqueryAlias `a`
         :  +- Project [_1#2 AS x#5, _2#3 AS y#6]
         :     +- LocalRelation [_1#2, _2#3]
         +- SubqueryAlias `b`
            +- Project [_1#11 AS x#14, _2#12 AS y#15]
               +- LocalRelation [_1#11, _2#12]

1 Ответ

0 голосов
/ 03 февраля 2020

Когда вы делаете df.as("a"), вы не переименовываете столбец данных. Вы просто позволяете получить к ним доступ с помощью a.columnName, чтобы устранить неопределенность. Следовательно, ваш when работает хорошо, потому что вы используете псевдонимы, но в итоге вы получаете несколько y столбцов. Я весьма удивлен тем, как ему удается заменить один из столбцов y ...

Однако, когда вы пытаетесь получить к нему доступ с именем y (без префикса), spark знает, какой тот, который вы хотите, и выдает ошибку.

Чтобы избежать ошибок, вы можете просто сделать все, что вам нужно, с одним select, например так:

df1.as("a").join(df2.as("b"), key_cols, "left_outer")
    .select(key_cols.map(col) ++
        df1
            .columns
            .diff(key_cols)
            .map(c => when(col(s"b.$c").isNull || col(s"b.$c").isNaN, col(s"a.$c"))
                   .otherwise(col(s"b.$c"))
                   .alias(c)
            ) : _*)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...