Исходя из того, что у вас есть RDD
и Map
, вы можете сделать это непосредственно итерируя ваш RDD
.
Предполагая, что Rating
имеет 3 поля (rdd1, rdd2, rdd3),давайте переименуем их в field1
, field2
и field3
, чтобы проиллюстрировать пример и избежать путаницы.
Предоставление этого примера источника ввода:
case class Rating(field1: String, field2: Int, field3: String) // custom case class
val yourRDD = spark.sparkContext.parallelize(
Seq(
Rating("rating1", 1, "str1"), // item 1
Rating("rating2", 2, "str2"), // item 2
Rating("rating3", 3, "str3") // item 3
)
)
yourRDD.toDF.show() // to visualize()
это выведет ваши данныеисточник, который будет выглядеть так:
+-------+------+------+
| field1|field2|field3|
+-------+------+------+
|rating1| 1| str1|
|rating2| 2| str2|
|rating3| 3| str3|
+-------+------+------+
Аналогично, у вас есть этот пример данных для вашей карты:
val yourMap = Map(
1 -> 1.111,
2 -> 2.222,
3 -> 3.333
)
println(yourMap)
Данные на вашей карте:
yourMap: scala.collection.immutable.Map[Int,Double] = Map(1 -> 1.111, 2 -> 2.222, 3 -> 3.333)
Затем, чтобы «объединить», вам нужно всего лишь повторить ваш RDD
, получить значение, которое вы собираетесь использовать как key
, в данном случае field2
, и использовать его как key
для map
. Примерно так:
yourRDD
.map(rating=>{ // iterate each item in your RDD
val key = rating.field2 // get the value from the current item
val valueFromMap = yourMap(key) // look for the value on the map using field2 as key - You need to handle null values in case that you wont have values for all the keys
(key, rating.field3, valueFromMap) // generating an output for a new RDD that will be created based on this
}).toDF.show(truncate=false) // visualize the output
Вышеприведенный код выведет:
+---+----+-----+
|_1 |_2 |_3 |
+---+----+-----+
|1 |str1|1.111|
|2 |str2|2.222|
|3 |str3|3.333|
+---+----+-----+
Надеюсь, это может помочь