Как объединить два RDD с разными длинами в Spark? - PullRequest
0 голосов
/ 07 марта 2020

у меня есть 2 RDD. первый СДР - это исходный СДР, а второй - СДР, который я отфильтровал от исходного и выполнил на нем некоторые процессы. после выполнения процессов я хочу присоединиться к ним. оригинальный СДР выглядит следующим образом:

(1,5)
(2,60)
(3,7)
(4,1)
(5,1)
...
(10,8)

, а фильтрованный и управляемый СДР:

(4,3)
(5,10)
(6,6)
(7,9)

как мне присоединиться к ним ?? когда я использую fullouterjoin или другие методы соединения, это выдает ошибку

Отредактировано

Я написал код, как вы сказали, вот так:

        original_RDD=original_RDD.fullOuterJoin(new_RDD).foreach { case (joinKey, (oldOption, newOption)) =>
        newOption match {
          case None => (joinKey,oldOption)
          case Some(newOption) => (joinKey,newOption)
        }
      }

но я получить эту ошибку:

Error:(232, 55) type mismatch;
 found   : Unit
 required: org.apache.spark.rdd.RDD[(Long, Int)]
        nodes=nodes.fullOuterJoin(joined_new).foreach { case (joinKey, (oldOption, newOption)) =>

1 Ответ

4 голосов
/ 07 марта 2020

См. синтаксис объединения

При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных (K, (V, W) )) пары со всеми парами элементов для каждого ключа. Внешние объединения поддерживаются с помощью leftOuterJoin, rightOuterJoin и fullOuterJoin.

originalRdd
  .fullOuterJoin(joinRdd)
  .foreach { case (joinKey, (oldOption, newOption)) =>
    newOption match {
      case None => println("new value is None")
      case Some(joinValue) => println(s"new value = $joinValue")
    }
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...