Spark Scala: обновить значение столбца данных из другого кадра данных - PullRequest
1 голос
/ 22 апреля 2019

a =

+------------+------------+------+
|        Name| Nationality|Salary|
+------------+------------+------+
|    A. Abbas|        Iraq|   €2K|
| A. Abdallah|      France|   €1K|
|A. Abdennour|     Tunisia|  €31K|

b =

+------------+------------+
|        Name|Salary      |
+------------+------------+
|    A. Abbas|€4K         |
| A. Abdallah|€1K         |
|A. Abdennour|€33K        |

ожидаемый обновленный DF должен выглядеть следующим образом:

+------------+------------+------+
|        Name| Nationality|Salary|
+------------+------------+------+
|    A. Abbas|        Iraq|   €4K|
| A. Abdallah|      France|   €1K|
|A. Abdennour|     Tunisia|  €33K|

Я пробовал в коде искры scala:

updatedDF = a.join(b, Seq("Name"), "inner")
updatedDF.show()

Но у меня появилось дублирование в выходных данных после выполнения соединения.Как я могу объединить между двумя фреймами данных без дублирования?

Ответы [ 2 ]

2 голосов
/ 23 апреля 2019
val a = sc.parallelize(List(("A. Abbas","Iraq","2K"),("A. Abdallah","France","1K"),("A. Abdennour","Tunisia","31K"))).toDF("Name","Nationality","Salary")
val b = sc.parallelize(List(("A. Abbas","4K"),("A. Abdallah","1K"),("A. Abdennour","33K"))).toDF("Name","Salary")
b.join(a,Seq("Name"),"inner").drop(a.col("Salary")).show

enter image description here

0 голосов
/ 23 апреля 2019

Если у вас есть дублирование, это означает, что столбец имени не является уникальным.Я предлагаю попробовать добавить столбец индекса для использования в соединении, а затем отбросить его:

    // Add index now...
    a = addColumnIndex(a).withColumn("index", monotonically_increasing_id)
    println("1- a count: " + a.count())

    // Add index now...
    b = addColumnIndex(b).withColumn("index", monotonically_increasing_id)
    println("b count: " + b.count())

    def addColumnIndex(df: DataFrame) = {
        spark.sqlContext.createDataFrame(
            df.rdd.zipWithIndex.map {
                case (row, index) => Row.fromSeq(row.toSeq :+ index)
            },
            StructType(df.schema.fields :+ StructField("index", LongType, false)))
    }

    ab = a.join(b, Seq("index", "Name"), "inner").drop(a.col("Salary")).drop(a.col("index"))

    println("3- ab count: " + ab.count())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...