Spark Join датафреймы и условно обновить столбцы - PullRequest
0 голосов
/ 28 мая 2019

Привет, у меня есть 2 свечи данных.
Первый:

+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update|         uid|segment_comp_11|cluster_comp_170|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|              2|              2|     IT|  41.884|  13.5204| 2019-04-15|d@rNdBkkN-p3|             10|               3|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|Ie2Bbs9PUR8h|             15|               4|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|Jk2Bbs9PUR8h|             15|               4|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+  

и второй:

+---------------+---------------+-------+--------+---------+-----------+------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update|         uid|
+---------------+---------------+-------+--------+---------+-----------+------------+
|              4|             17|     IT| 40.8413|  14.2008| 2019-04-16|ASBuzjKa6nIB|
|              2|              2|     IT|  41.884|  15.5204| 2019-04-16|d@rNdBkkN-p3|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-16|Ie2Bbs9PUR8h|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|xyzBbs9PUR8h|
+---------------+---------------+-------+--------+---------+-----------+------------+  

кроме страны, широты, долготы, last_update и uid, нижний Df может иметь различные добавленные столбцы.
Идея состоит в том, чтобы сделать полное соединение с помощью uid, обновить общие столбцы и сохранить необычные столбцы.
Как я мог выполнить эту задачу? Благодаря.

Ответы [ 3 ]

1 голос
/ 29 мая 2019

Вот код (вы не указали, поэтому давайте попробуем Scala):

// Your dataframes
val upper = ...
val lower = ...

// Find out the columns
val sharedCols = upper.columns.toSet & lower.columns.toSet
val disjointCols = (upper.columns.toSet | lower.columns.toSet) -- sharedCols
val columns = (sharedCols.map(c => coalesce(lower.col(c), upper.col(c)).as(c)) ++ disjointCols.map(c => col(c))).toList

// Join and project    
val joined = upper.join(lower, upper.col("uid") === lower.col("uid"), "full_outer").select(columns:_*)
joined.show
0 голосов
/ 30 мая 2019

Я нашел это решение, чтобы избежать перетасовки из-за объединения.
Что вы, ребята, думаете?
Могу ли я использовать какие-либо улучшения или ярлыки для scala?

def func_union_name(myCols: Set[String], allCols: Set[String]) = {
    allCols.toList.map(x => x match {
      case x if myCols.contains(x) => col(x)
      case _ => lit(null).as(x)
    })
  }  

После определения вышеуказанной функции я делаю:

      val upper_col = tableToUpdate.columns.toSet
      val bottom_col = miniJoin.columns.toSet
      val union_cols = tableToUpdate_col ++ miniJoin_col

          upper
            .select(func_union_name(tableToUpdate_col, union_cols): _*)
            .union(bottom.select(func_union_name(bottom_col, union_cols): _*))            
            .withColumn("max_lu",max(col("last_update"))
                                  .over(Window.partitionBy(col("uid"))))
            .filter(col("last_update").geq(col("max_lu")))
            .drop(col("max_lu"))
0 голосов
/ 29 мая 2019

Если, как вы сказали в комментариях, вы хотите всегда общие столбцы из нижней таблицы. Вы можете сделать простое объединение, теряя обычные облака из df1 перед объединением.

joined_df = df1.drop("some_common_columns").join(df2,Seq("uid"))

Это оставит вас с объединенными данными только с общими облаками из df1 и необычными обоими dfs в новом join_df

...