Нечто подобное можно также сделать:
case class Items(id: String,items: List[String])
case class ItemCounts(id: String,itemcounts: List[Int])
val rdd1 = sc.parallelize(Seq(
Items("id1",List("item10","item2","item4")),
Items("id2",List("item4","item9")),
Items("id3",List("item1","item3"))))
val rdd2 = sc.parallelize(Seq(
ItemCounts("id1",List(100,200)),
ItemCounts("id2",List(200,500,100,1100)),
ItemCounts("id3",List(10))))
Создать пару RDD для присоединения.
val ItemsRDD = rdd1.map(item => (item.id, item))
val ItemsCountsRDD= rdd2.map(itemcnts => (itemcnts.id, itemcnts))
ItemsRDD.join(ItemsCountsRDD).map(x => (x._2._1.id,x._2._2.itemcounts))
.collect.foreach(println)
Результат выглядит следующим образом:
(id3,List(10))
(id1,List(100, 200))
(id2,List(200, 500, 100, 1100))