Если я правильно понимаю ваше требование, вот подход:
- Создать СДР, скажем,
rdd2c2
, отличающихся значений из 2-го столбца rdd2
- Выполните
cartesian join
для rdd1
и rdd2c2
и преобразуйте результат в СДР [K, V], чтобы сделать the
1-й столбец и rdd2c2
столбец в качестве key
- Преобразование
rdd2
в СДР [K, V], чтобы сделать его 1-й и 2-й столбцы его key
- Выполнить
leftOuterJoin
на двух СДР [K, V] s и преобразовать элементы в требуемую структуру
Пример кода:
val rdd1 = sc.parallelize(Seq(
("a", 1, "some_values1"),
("b", 1, "some_values2"),
("c", 1, "some_values3")
))
val rdd2 = sc.parallelize(Seq(
("a", 2, "some_values1"),
("b", 2, "some_values2"),
("a", 3, "some_values1"),
("b", 3, "some_values2"),
("c", 3, "some_values2")
))
val rdd2c2 = rdd2.map(_._2).distinct
// rdd2c2.collect: Array[Int] = Array(2, 3)
val rddKV1 = rdd1.cartesian(rdd2c2).
map{ case (a, b) => ((a._1, b), (a._2, a._3))}
// rddKV1.collect: Array[((String, Int), (Int, String))] = Array(
// ((a,2),(1,some_values1)),
// ((a,3),(1,some_values1)),
// ((b,2),(1,some_values2)),
// ((b,3),(1,some_values2)),
// ((c,2),(1,some_values3)),
// ((c,3),(1,some_values3)))
val rddKV2 = rdd2.map(r => ((r._1, r._2), r._3))
// rddKV2.collect: Array[((String, Int), String)] = Array(
// ((a,2),some_values1),
// ((b,2),some_values2),
// ((a,3),some_values1),
// ((b,3),some_values2),
// ((c,3),some_values2))
val rddJoined = rddKV1.leftOuterJoin(rddKV2).
map{ case (k, v) => (k._1, v._1._1, k._2, v._2) }
// rddJoined.collect: Array[(String, Int, Int, Option[String])] = Array(
// (a,1,3,Some(some_values1)),
// (a,1,2,Some(some_values1)),
// (c,1,2,None),
// (b,1,2,Some(some_values2)),
// (b,1,3,Some(some_values2)),
// (c,1,3,Some(some_values2)))