У меня есть два числа в виде RDD[(String, Array[(String, Array[String])])]
.У меня есть данные в них как:
rdd1 = (4, [(0, [1,4,5,6]), (2, [4,5,6])])
(5, [(0, [1,4,5,6]), (2, [4,5,6])]) ......
rdd2 be like = (4, [(0, [1,4,6])])
(5, [(1, [2,5,6]), (2, [3,5])])......
Сначала я хочу проверить, присутствует ли ключ rdd1 в rdd2, а затем для кортежей внутри их массива. Я хочу запустить цикл for с каждым кортежем вrdd1 с каждым набором этого ключа в rdd2.Например, и rdd1, и rdd2 содержат ключ как 4. Поэтому я хочу запустить цикл for для этого ключа 4, и его элементы должны выглядеть следующим образом (0, [1,4,5,6]) (0, [1,4,6])
и (2, [4,5,6]) (0, [1,4,6])
.Итерируя эти данные, я должен выполнить некоторые операции над ними.
То, что я пытался сделать, - это объединить эти два rdds и затем применить цикл for, но это также будет выполнять итерации для кортежей с одинаковыми rdds.
val rdd3 = merged_both_rdd1_rdd2_by_key.flatMap(x=> {for(i <- 0 until x._2.size) {for(j <- i until x._2.size)} })
Но это повторяет и кортежи того же самого rdd.Я хочу только перебрать каждый из кортежей rdd1 для каждого из них с помощью rdd2.
Я пытался выполнить вложенный цикл for для двух rdds, но он дает мне некоторую ошибку.
val sortedLines2 = sortedLines1.flatMap(y => {
var myMap: Map[(String, String),Double] = Map()
val second = sortedLines12.flatMap(x => { var myMap1: Map[(String, String),Double] = Map()
for(i <- 0 until x._2.size)
{
for(j <- 0 until y._2.size)
{
if(i != j)
{
val inter = (x._2(i)._2.toSet & y._2(j)._2.toSet).size.toDouble
val union = (x._2(i)._2.toSet.size + y._2(j)._2.toSet.size).toDouble - inter
val div = inter/union
if(div >= threshold)
{
if(!myMap.contains((x._2(i)._1, y._2(j)._1)) )
{
myMap += ( (x._2(i)._1, y._2(j)._1) -> div )
myMap1 += ( (x._2(i)._1, x._2(j)._1) -> div )
}
}
}
}
}
myMap1
}
)
myMap
}
)
Byделая это, я получаю ниже ошибку:
This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.