Конкат данных с несколькими скалами - PullRequest
0 голосов
/ 28 июня 2018

У меня есть несколько DFS, которые я хочу объединить в 1 большой DF

+----+----------+----------+
|year|   state  |     count|
+----+----------+----------+
|2014|        CT|    343477|
|2014|        DE|    123431|
|2014|        MD|    558686|
|2014|        NJ|    773321|
|2015|        CT|    343477|
|2015|        DE|    123431|
|2015|        MD|    558686|
|2015|        NJ|    773321|
|2016|        CT|    343477|
|2016|        DE|    123431|
|2016|        MD|    558686|
|2016|        NJ|    773321|
|2017|        CT|    343477|
|2017|        DE|    123431|
|2017|        MD|    558686|
|2017|        NJ|    773321|
+----+----------+----------+
+-----------------+
|count_2          |
-----------------+
|           343477|
|           123431|
|           558686|
|           773321|
|           343477|
|           123431|
|           558686|
|           773321|
|           343477|
|           123431|
|           558686|
|           773321|
|           343477|
|           123431|
|           558686|
|           773321|
+-----------------+

Я хочу объединить их в 1 ДФ

        +----+----------+----------+--------------------
        |year|   state  |     count| count_2
        +----+----------+----------+--------------------
        |2014|        CT|    343477|343477
        |2014|        DE|    123431|123431
        |2014|        MD|    558686|558686
        |2014|        NJ|    773321|773321
        |2015|        CT|    343477|343477
        |2015|        DE|    123431|123431
        |2015|        MD|    558686|558686
        |2015|        NJ|    773321|773321
        |2016|        CT|    343477|343477
        so on...

Я использовал sql (), но он не работал .. Я также пытался присоединиться к df (левое соединение), оно также не работает, какое объединение будет без дубликата? Спасибо!

1 Ответ

0 голосов
/ 28 июня 2018

Я думаю, для вашей проблемы нет ярлыка. Пожалуйста, найдите мое решение ниже

//Inputs: 
val df1=Seq((2014,"CT",343477),(2014,"DE",123431),(2014,"MD",558686),(2014,"NJ",773321),(2015,"CT",343477),(2015,"DE",123431),(2015,"MD",558686),(2015,"NJ",773321)).toDF("year","state","count")

val df2=Seq(343477,123431,558686,773321,343477,123431,558686,773321).toDF("count_2")

//Solution: 

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val winFun=Window.partitionBy("year","state","count").orderBy("year")
df1.join(df2,df1("count")===df2("count_2")).withColumn("row_no",row_number over winFun).filter("row_no =1").drop("row_no").orderBy("year").show

Пример вывода:

+----+-----+------+-------+
|year|state| count|count_2|
+----+-----+------+-------+
|2014|   DE|123431| 123431|
|2014|   MD|558686| 558686|
|2014|   CT|343477| 343477|
|2014|   NJ|773321| 773321|
|2015|   MD|558686| 558686|
|2015|   DE|123431| 123431|
|2015|   CT|343477| 343477|
|2015|   NJ|773321| 773321|
+----+-----+------+-------+
...