Пожалуйста, предложите лучший способ написать встроенную функцию в месте вызова func_1. Также он должен делать то, что пытается сделать func_1.
(я знаю, что функция не может вернуть две вещи в scala)
Я читаю строки из файла (args (0)) , где каждая строка состоит из чисел, разделенных запятой.
Для каждой строки первое число nodeId , а остальные числа - это соседей
Для первых 5 строк первое число само по себе является cluseterId .
graph содержит каждый узел с Long: nodeId, Long: clusterId и List [Long]: соседей
Я пытаюсь написать карту, чтобы уменьшить функциональность, где эта функция "func_1" подобна преобразователю, который генерирует (nodeId, clusterId, соседей), а затем проверяет каждый элемент в соседях и, если clusterId > -1, то излучает (nodeId, clusterId). Короче говоря кортеж (nodeId, clusterId, соседей) должен передаваться безоговорочно
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.collection.mutable.ListBuffer
object Partition {
val depth = 6
def func_1(nodeId:Long,clusterId:Long,neightbours:List[Long]):Either[(Long,Long,List[Long]),(Long,Long)]={
Left(nodeId,clusterId,neightbours)
for(x <- neightbours){
if(clusterId > -1){
Right(x,clusterId)
}
}
}
def func_2(){
}
def main ( args: Array[ String ] ) {
val conf=new SparkConf().setAppName("Partition")
val sc=new SparkContext(conf)
var count : Int = 0
var graph=sc.textFile(args(0)).map(line =>{
var nodeId:Long=line(0).toLong
var clusterId:Long=1
var neighbours=new ListBuffer[Long]()
if(count < 5){
clusterId=line(0).toLong
}else{
clusterId= -1 * clusterId
}
val nums=line.split(",")
for(i <- 1 to line.length()-1){
neighbours.+=(nums(i).toLong)
}
(nodeId,clusterId,neighbours.toList)
}).collect()
graph.foreach(println)
for (i <- 1 to depth)
graph = graph.flatMap{ func_1 }.groupByKey.map{ /* (2) */ }
/* finally, print partition sizes */
}
}