Вы можете использовать концепцию Data Frame Join вместо соединения RDD. Это будет легко. Вы можете сослаться на мой пример кода ниже. Надеюсь, что это поможет вам.
Я считаю, что ваши данные в том же формате, как вы упоминали выше. Если он в формате CSV или любом другом формате, вы можете пропустить Шаг-2 и обновить Шаг-1 в соответствии с форматом данных. Если вам требуется вывод в формате RDD, вы можете использовать Step-5 , в противном случае вы можете проигнорировать его согласно комментарию, упомянутому во фрагменте кода.
Я изменил данные (например, A_____, B_____, C____) только для удобства чтения.
//Step1: Loading file1 and file2 to corresponding DataFrame in text format
val df1 = spark.read.format("text").load("<path of file1>")
val df2 = spark.read.format("text").load("<path of file2>")
//Step2: Spliting single column "value" into multiple column for join Key
val file1 = ((((df1.withColumn("col1", split($"value", " ")(0)))
.withColumn("col2", split($"value", " ")(1)))
.withColumn("col3", split($"value", " ")(2)))
.withColumn("col4", split($"value", " ")(3)))
.select("col1","col2", "col3", "col4")
/*
+-------+-------+----+----+
|col1 |col2 |col3|col4|
+-------+-------+----+----+
|0000003|A______|26 |F |
|0000005|B______|50 |F |
|0000007|C______|42 |F |
+-------+-------+----+----+
*/
val file2 = ((((((df2.withColumn("col1", split($"value", " ")(0)))
.withColumn("col2", split($"value", " ")(1)))
.withColumn("col3", split($"value", " ")(2)))
.withColumn("col4", split($"value", " ")(3)))
.withColumn("col5", split($"value", " ")(4)))
.withColumn("col6", split($"value", " ")(5)))
.select("col1","col2", "col3", "col4","col5","col6")
/*
+-------+----+----+----+----+----+
|col1 |col2|col3|col4|col5|col6|
+-------+----+----+----+----+----+
|0000005|82 |79 |16 |21 |80 |
|0000001|46 |39 |8 |5 |21 |
|0000004|58 |71 |20 |10 |6 |
|0000009|60 |89 |33 |18 |6 |
|0000003|30 |50 |71 |36 |30 |
|0000007|50 |2 |33 |15 |62 |
+-------+----+----+----+----+----+
*/
//Step3: you can do alias to refer column name with aliases to increase readablity
val file01 = file1.as("f1")
val file02 = file2.as("f2")
//Step4: Joining files on Key
file01.join(file02,col("f1.col1") === col("f2.col1"))
/*
+-------+-------+----+----+-------+----+----+----+----+----+
|col1 |col2 |col3|col4|col1 |col2|col3|col4|col5|col6|
+-------+-------+----+----+-------+----+----+----+----+----+
|0000005|B______|50 |F |0000005|82 |79 |16 |21 |80 |
|0000003|A______|26 |F |0000003|30 |50 |71 |36 |30 |
|0000007|C______|42 |F |0000007|50 |2 |33 |15 |62 |
+-------+-------+----+----+-------+----+----+----+----+----+
*/
// Step5: if you want file data in RDD format the you can use below command
file01.join(file02,col("f1.col1") === col("f2.col1")).rdd.collect
/*
Array[org.apache.spark.sql.Row] = Array([0000005,B______,50,F,0000005,82,79,16,21,80], [0000003,A______,26,F,0000003,30,50,71,36,30], [0000007,C______,42,F,0000007,50,2,33,15,62])
*/