Как правильно вызвать функцию, используя foreach в спарк, используя scala? - PullRequest
0 голосов
/ 22 февраля 2020

Я загрузил свой список ребер в graphX ​​и создал структуру для сохранения идентификатора узлов и их соседей, используя этот код:

val nodes_neighbors: VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Either).cache()

Я написал простую функцию для получения идентификатора узлов и печати их соседей. после решения моей проблемы я расширю функцию. простая функция, как показано ниже:

def keyNodes_neighbors(vertex: VertexId,neighbors_list:VertexRDD[Array[VertexId]] ) = {

     neighbors_list.filter(x=>x._1==vertex).foreach(x=>print(x._2.foreach(println)))

   }

У меня есть другая структура, в которой он хранит идентификатор узла и их вес в качестве своего свойства: important_nodes: RDD[(VertexId, Float)]

Теперь я хочу вызвать keyNodes_neighbors функция для каждого из этих узлов в important_nodes RDD.

important_nodes.keys.foreach(i=>keyNodes_neighbors(i,all_neighbors))

, когда я делаю приведенный выше код, я получаю некоторые ошибки. но когда я проверяю этот код nodes_neighbors.filter(x=>x._1==1).foreach(x=>print(x._2.foreach(println))) для случайного узла, например ID узла = 1, я получаю соседей этого узла. Пожалуйста, помогите мне с этой проблемой.

...