Это на самом деле работает, моя проблема заключалась в том, что я вызываю метод, который использует другую карту, поэтому происходит сбой.
case class Car( id :Long, listWheels : List[Wheel])
case class Wheel( id : Long)
rddCars.flatMap(
a => a.listWheels.map(
b => (a.id, b.id, getBrandFromSerial(serialToBrandMap,b.id))
)
)
getBrandFromSerial(serialToBrandMap: RDD[(Int, String)], id : Int) : String = {
val a = serialToBrandMap.filter(_._1 == id)
val b = a.map(_._2).top(1)
b(0)
}
Ожидаемый результат - СДР [(Int, Int, String)] с идентификатором автомобиля,Идентификатор колеса и марка колеса на Tuple3.
РЕДАКТИРОВАТЬ: Образец ввода / вывода
Вход:
val wheels1 = List(Wheel(1),Wheel(1),Wheel(2), Wheel(2)
val wheels2 = List(Wheel(3),Wheel(3),Wheel(2), Wheel(2)
val rddCars : RDD[Car] = sparkContect.parallelize(List(Car(1,wheels1), Car(2, wheels2)))
val serialToBrandList : List[(Int, String)] = List((1,"Brand1"), (2,"Brand2"),(3,"Brand3"))
val serialToBrandMap : RDD[(Int, String)] = sparkContect.parallelize( serialToBrandList)
Выход:
(1,1,Brand1),(1,1,Brand1),(1,2,Brand2)....(2,3,Brand3) and so on