Как добиться внутреннего соединения, используя Java SparkSql - PullRequest
0 голосов
/ 19 февраля 2020

У меня есть два набора данных как

DATASET1

+-------+--------------------+  
|     id|                name|  
+-------+--------------------+  
|S703401|      Ryan P Cassidy|  
|S703401|Christopher J Mat...|  
|S703401|      Frank E LaSota|    
|S703401|      Ryan P Cassidy|  
|S703401|Anthony L Locricchio|  
|S703401|         Jason Monte|  
+-------+--------------------+  

DATASET2

+-------+------+  
|     id|   nic|  
+-------+------+    
|S703401|  RC82|  
|S703401|    NA|  
|S703401|   FL3|  
|S703401|  RC82|  
|S703401|    NA|  
|S703401|JM2080|  
+-------+------+

, и я хочу присоединить их к идентификатору, чтобы можно было выводить как

+-------+--------------------+-----------+  
|     id|                name|       nic |   
+-------+--------------------+-----------+  
|S703401|      Ryan P Cassidy|       RC82|  
|S703401|Christopher J Mat...|         NA|  
|S703401|      Frank E LaSota|        FL3|  
|S703401|      Ryan P Cassidy|       RC82|  
|S703401|Anthony L Locricchio|         NA|  
|S703401|         Jason Monte|     JM2080|  
+-------+--------------------+-----------+  

Я использую java spark Набор данных join = dataset1.join (dataset2, "id"); но из них я получаю декартово произведение для всех строк, например

+-------+--------------------+------+  
|     id |                 name|   nic|  
+-------+--------------------+------+  
|S703401|      Ryan P Cassidy|JM2080|  
|S703401|      Ryan P Cassidy|    NA|  
|S703401|      Ryan P Cassidy|  RC82|  
|S703401|      Ryan P Cassidy|   FL3|  
|S703401|      Ryan P Cassidy|    NA|  
|S703401|      Ryan P Cassidy|  RC82|  
|S703401|Christopher J Mat...|JM2080|  
|S703401|Christopher J Mat...|    NA|  
|S703401|Christopher J Mat...|  RC82|  
|S703401|Christopher J Mat...|   FL3|  
|S703401|Christopher J Mat...|    NA|  
|S703401|Christopher J Mat...|  RC82|  
|S703401|      Frank E LaSota|JM2080|  
|S703401|      Frank E LaSota|    NA|  
|S703401|      Frank E LaSota|  RC82|  
|S703401|      Frank E LaSota|   FL3|  
|S703401|      Frank E LaSota|    NA|  
|S703401|      Frank E LaSota|  RC82|  
|S703401|      Ryan P Cassidy|JM2080|  
|S703401|      Ryan P Cassidy|    NA|  
+-------+--------------------+------+   

Так чего мне здесь не хватает?

1 Ответ

0 голосов
/ 19 февраля 2020

Не использовать функцию monotonically_increasing_id для генерации rowId. Это дает случайное число, которое не сработает join logi c. Так что go с row_number функцией, которая даст вам правильный rowId во всех ваших случаях.

scala> import org.apache.spark.sql.expressions.Window

scala> df.show()
+-------+------------------+
|     id|              name|
+-------+------------------+
|S703401|      RyanPCassidy|
|S703401|ChristopherJMat...|
|S703401|      FrankELaSota|
|S703401|      RyanPCassidy|
|S703401|AnthonyLLocricchio|
|S703401|        JasonMonte|
+-------+------------------+


scala> df1.show()
+-------+------+
|     id|   nic|
+-------+------+
|S703401|  RC82|
|S703401|    NA|
|S703401|   FL3|
|S703401|  RC82|
|S703401|    NA|
|S703401|JM2080|
+-------+------+


scala> val w = Window.partitionBy(col("id")).orderBy(lit(1))

scala> df.withColumn("rw", row_number.over(w))
         .join(df1.withColumn("rw", row_number.over(w)), List("id", "rw"),"left")
         .drop("rw")
         .show(false)
+-------+------------------+------+
|id     |name              |nic   |
+-------+------------------+------+
|S703401|RyanPCassidy      |RC82  |
|S703401|ChristopherJMat...|NA    |
|S703401|FrankELaSota      |FL3   |
|S703401|RyanPCassidy      |RC82  |
|S703401|AnthonyLLocricchio|NA    |
|S703401|JasonMonte        |JM2080|
+-------+------------------+------+
...