Цикл For в двух списках RDD для аналогичного ключа - PullRequest
0 голосов
/ 14 октября 2018

У меня есть два числа в виде RDD[(String, Array[(String, Array[String])])].У меня есть данные в них как:

rdd1 = (4, [(0, [1,4,5,6]), (2, [4,5,6])])

(5, [(0, [1,4,5,6]), (2, [4,5,6])]) ......

rdd2 be like = (4, [(0, [1,4,6])])

(5, [(1, [2,5,6]), (2, [3,5])])......

Сначала я хочу проверить, присутствует ли ключ rdd1 в rdd2, а затем для кортежей внутри их массива. Я хочу запустить цикл for с каждым кортежем вrdd1 с каждым набором этого ключа в rdd2.Например, и rdd1, и rdd2 содержат ключ как 4. Поэтому я хочу запустить цикл for для этого ключа 4, и его элементы должны выглядеть следующим образом (0, [1,4,5,6]) (0, [1,4,6]) и (2, [4,5,6]) (0, [1,4,6]).Итерируя эти данные, я должен выполнить некоторые операции над ними.

То, что я пытался сделать, - это объединить эти два rdds и затем применить цикл for, но это также будет выполнять итерации для кортежей с одинаковыми rdds.

val rdd3 = merged_both_rdd1_rdd2_by_key.flatMap(x=> {for(i <- 0 until x._2.size) {for(j <- i until x._2.size)} })

Но это повторяет и кортежи того же самого rdd.Я хочу только перебрать каждый из кортежей rdd1 для каждого из них с помощью rdd2.

Я пытался выполнить вложенный цикл for для двух rdds, но он дает мне некоторую ошибку.

    val sortedLines2 = sortedLines1.flatMap(y => {
                                              var myMap: Map[(String, String),Double] = Map()
                                              val second = sortedLines12.flatMap(x => { var myMap1: Map[(String, String),Double] = Map()
                                              for(i <- 0 until x._2.size)
                                              {
                                                for(j <- 0 until y._2.size)
                                                {
                                                  if(i != j)
                                                  {
                                                    val inter = (x._2(i)._2.toSet & y._2(j)._2.toSet).size.toDouble
                                                    val union = (x._2(i)._2.toSet.size + y._2(j)._2.toSet.size).toDouble - inter
                                                    val div = inter/union
                                                    if(div >= threshold)
                                                    { 
                                                      if(!myMap.contains((x._2(i)._1, y._2(j)._1)) )
                                                      {
                                                          myMap += ( (x._2(i)._1, y._2(j)._1) -> div )
                                                          myMap1 += ( (x._2(i)._1, x._2(j)._1) -> div )
                                                      }
                                                    }
                                                  }
                                                 } 
                                               }
                                               myMap1
                                              }
)
myMap
}
)

Byделая это, я получаю ниже ошибку:

    This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

1 Ответ

0 голосов
/ 14 октября 2018

Вы можете сначала попытаться присоединиться к rdds с помощью ключа:

rddsJoin = rdd1.join(rdd2)

, а затем выполнить цикл по значениям rdd объединения:

rddsJoin.foreach{case(key,(v1,v2)) => 
        {for(vE1<-v1;vE2<-v2) {doSomething(vE1,vE2)}}}

Если вы хотитечтобы выполнить преобразование (а не операцию), замените foreach на map или flatMap в соответствии с потребностями вашего приложения.

...