Scala - объединение двух RDD на основе ключа в Spark - PullRequest
0 голосов
/ 11 сентября 2018

Я пишу приложение Spark, используя Scala. У меня есть следующие два RDD:

(a, 1, some_values1)
(b, 1, some_values2)
(c, 1, some_values3)

и

(a, 2, some_values1)
(b, 2, some_values2)
(a, 3, some_values1)
(b, 3, some_values2)

Я пытаюсь получить этот вывод:

(a, 1, 2, computed_values1)
(b, 1, 2, computed_values2)
(c, 1, 2, None)
(a, 1, 3, computed_values1)
(b, 1, 3, computed_values2)
(c, 1, 3, None)

Таким образом, буквы здесь используются для сопоставления каждой записи из первого СДР со второй. Я пытался использовать метод join, но не работал для записи c. Как мне этого добиться?

UPDATE

Другой пример:

(a, 1, some_values1)
(b, 1, some_values2)
(c, 1, some_values3)

и

(a, 2, some_values1)
(b, 2, some_values2)
(a, 3, some_values1)
(b, 3, some_values2)
(c, 3, some_values2)

Я пытаюсь получить этот вывод:

(a, 1, 2, computed_values1)
(b, 1, 2, computed_values2)
(c, 1, 2, None)
(a, 1, 3, computed_values1)
(b, 1, 3, computed_values2)
(c, 1, 3, computed_values3)

Ответы [ 2 ]

0 голосов
/ 11 сентября 2018

Если я правильно понимаю ваше требование, вот подход:

  1. Создать СДР, скажем, rdd2c2, отличающихся значений из 2-го столбца rdd2
  2. Выполните cartesian join для rdd1 и rdd2c2 и преобразуйте результат в СДР [K, V], чтобы сделать the 1-й столбец и rdd2c2 столбец в качестве key
  3. Преобразование rdd2 в СДР [K, V], чтобы сделать его 1-й и 2-й столбцы его key
  4. Выполнить leftOuterJoin на двух СДР [K, V] s и преобразовать элементы в требуемую структуру

Пример кода:

val rdd1 = sc.parallelize(Seq(
  ("a", 1, "some_values1"),
  ("b", 1, "some_values2"),
  ("c", 1, "some_values3")
))

val rdd2 = sc.parallelize(Seq(
  ("a", 2, "some_values1"),
  ("b", 2, "some_values2"),
  ("a", 3, "some_values1"),
  ("b", 3, "some_values2"),
  ("c", 3, "some_values2")
))

val rdd2c2 = rdd2.map(_._2).distinct
// rdd2c2.collect: Array[Int] = Array(2, 3)

val rddKV1 = rdd1.cartesian(rdd2c2).
  map{ case (a, b) => ((a._1, b), (a._2, a._3))}
// rddKV1.collect: Array[((String, Int), (Int, String))] = Array(
//   ((a,2),(1,some_values1)),
//   ((a,3),(1,some_values1)),
//   ((b,2),(1,some_values2)),
//   ((b,3),(1,some_values2)),
//   ((c,2),(1,some_values3)),
//   ((c,3),(1,some_values3)))

val rddKV2 = rdd2.map(r => ((r._1, r._2), r._3))
// rddKV2.collect: Array[((String, Int), String)] = Array(
//   ((a,2),some_values1),
//   ((b,2),some_values2),
//   ((a,3),some_values1),
//   ((b,3),some_values2),
//   ((c,3),some_values2))

val rddJoined = rddKV1.leftOuterJoin(rddKV2).
  map{ case (k, v) => (k._1, v._1._1, k._2, v._2) }
// rddJoined.collect: Array[(String, Int, Int, Option[String])] = Array(
//   (a,1,3,Some(some_values1)),
//   (a,1,2,Some(some_values1)),
//   (c,1,2,None),
//   (b,1,2,Some(some_values2)),
//   (b,1,3,Some(some_values2)),
//   (c,1,3,Some(some_values2)))
0 голосов
/ 11 сентября 2018

Если «c» должен быть в результате только один раз (угадайте, опечатка в желаемом выводе), можно получить с помощью такого кода:

val data1 = List(
  ("a", 1, "some_values1"),
  ("b", 1, "some_values2"),
  ("c", 1, "some_values3")
)

val data2 = List(
  ("a", 2, "some_values1"),
  ("b", 2, "some_values2"),
  ("a", 3, "some_values1"),
  ("b", 3, "some_values2")
)

val rdd1 = sparkContext.parallelize(data1)
val rdd2 = sparkContext.parallelize(data2)

val rdd1WithKey = rdd1.map(v => (v._1, (v._2, v._3)))
val rdd2WithKey = rdd2.map(v => (v._1, (v._2, v._3)))

val joined = rdd1WithKey.fullOuterJoin(rdd2WithKey)
joined.foreach(println)

Выход:

(b,(Some((1,some_values2)),Some((2,some_values2))))
(a,(Some((1,some_values1)),Some((2,some_values1))))
(b,(Some((1,some_values2)),Some((3,some_values2))))
(a,(Some((1,some_values1)),Some((3,some_values1))))
(c,(Some((1,some_values3)),None))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...