Исключить внутреннее соединение кадра данных без совпадения совпадений - PullRequest
1 голос
/ 18 апреля 2020

Я хочу объединить два кадра данных, основываясь на определенных условиях: spark scala. Однако выгода заключается в том, что если строка в df1 совпадает с какой-либо строкой в ​​df2, она не должна пытаться сопоставить ту же строку в df1 с любой другой строкой в ​​df2. Ниже приведены примерные данные и результаты, которые я пытаюсь получить.

   DF1
--------------------------------
Emp_id | Emp_Name | Address_id
1      |  ABC     |   1
2      |  DEF     |   2
3      |  PQR     |   3
4      |  XYZ     |   1

   DF2
-----------------------
Address_id | City 
1          | City_1
1          | City_2
2          | City_3
REST       | Some_City

  Output DF
----------------------------------------
Emp_id | Emp_Name | Address_id | City
1      |  ABC     |   1        | City_1
2      |  DEF     |   2        | City_3
3      |  PQR     |   3        | Some_City
4      |  XYZ     |   1        | City_1 

Примечание: - REST подобен групповому символу. Любое значение может быть равно REST.

Так что в приведенном выше примере emp_name "AB C" может совпадать с City_1, City_2 или Some_City. Выходной DF содержит только City_1, потому что он находит его первым.

Ответы [ 2 ]

1 голос
/ 19 апреля 2020

У вас, кажется, есть собственный логин c для вашего объединения. По сути, я должен был придумать ниже UDF .

Обратите внимание, что вы можете изменить лог c для UDF согласно вашему требованию.

import spark.implicits._
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.first

//dataframe 1    
val df_1 = Seq(("1", "ABC", "1"), ("2", "DEF", "2"), ("3", "PQR", "3"), ("4", "XYZ", "1")).toDF("Emp_Id", "Emp_Name", "Address_Id")

//dataframe 2
val df_2 = Seq(("1", "City_1"), ("1", "City_2"), ("2", "City_3"), ("REST","Some_City")).toDF("Address_Id", "City_Name")

// UDF logic
val join_udf = udf((a: String, b: String) => {
      (a,b) match {
        case ("1", "1") => true
        case ("1", _) => false
        case ("2", "2") => true
        case ("2", _) => false
        case(_, "REST") => true
        case(_, _) => false

    }})

val dataframe_join = df_1.join(df_2, join_udf(df_1("Address_Id"), df_2("Address_Id")), "inner").drop(df_2("Address_Id"))
                             .orderBy($"City_Name")
                             .groupBy($"Emp_Id", $"Emp_Name", $"Address_Id")
                             .agg(first($"City_Name"))
                             .orderBy($"Emp_Id")

dataframe_join.show(false)

В основном после применения UDF вы получаете все возможные комбинации совпадений.

Публикуйте, что когда вы применяете groupBy и используете first функцию agg , вы получите только отфильтрованные значения как то, что вы есть находясь в поиске.

+------+--------+----------+-----------------------+
|Emp_Id|Emp_Name|Address_Id|first(City_Name, false)|
+------+--------+----------+-----------------------+
|1     |ABC     |1         |City_1                 |
|2     |DEF     |2         |City_3                 |
|3     |PQR     |3         |Some_City              |
|4     |XYZ     |1         |City_1                 |
+------+--------+----------+-----------------------+

Обратите внимание, что я использовал Spark 2.3 и надеюсь, что это поможет!

0 голосов
/ 19 апреля 2020
{    
    import org.apache.spark.sql.{SparkSession}
    import org.apache.spark.sql.functions._

    object JoinTwoDataFrame extends App {

      val spark = SparkSession.builder()
        .master("local")
        .appName("DataFrame-example")
        .getOrCreate()

      import spark.implicits._

      val df1 = Seq(
        (1, "ABC", "1"),
        (2, "DEF", "2"),
        (3, "PQR", "3"),
        (4, "XYZ", "1")
      ).toDF("Emp_id", "Emp_Name", "Address_id")

      val df2 = Seq(
        ("1", "City_1"),
        ("1", "City_2"),
        ("2", "City_3"),
        ("REST", "Some_City")
      ).toDF("Address_id", "City")

      val restCity: Option[String] = Some(df2.filter('Address_id.equalTo("REST")).select('City).first()(0).toString)

      val res = df1.join(df2, df1.col("Address_id") === df2.col("Address_id") , "left_outer")
        .select(
          df1.col("Emp_id"),
          df1.col("Emp_Name"),
          df1.col("Address_id"),
          df2.col("City")
        )
          .withColumn("city2", when('City.isNotNull, 'City).otherwise(restCity.getOrElse("")))
          .drop("City")
          .withColumnRenamed("city2", "City")
          .orderBy("Address_id", "City")
          .groupBy("Emp_id", "Emp_Name", "Address_id")
          .agg(collect_list("City").alias("cityList"))
          .withColumn("City", 'cityList.getItem(0))
          .drop("cityList")
          .orderBy("Emp_id")

            res.show(false)

    //  +------+--------+----------+---------+
    //  |Emp_id|Emp_Name|Address_id|City     |
    //  +------+--------+----------+---------+
    //  |1     |ABC     |1         |City_1   |
    //  |2     |DEF     |2         |City_3   |
    //  |3     |PQR     |3         |Some_City|
    //  |4     |XYZ     |1         |City_1   |
    //  +------+--------+----------+---------+

    }
}
...