Обработка графиков с использованием GraphX ​​в Apache Spark + большие данные - PullRequest
0 голосов
/ 20 мая 2018

Я создал график, используя данные узлов и ребер в приложении Spark.Теперь я хочу составить список смежности для созданного графа.Как я могу добиться этого?

Я написал следующий код для чтения CSV-файлов для данных узлов и ребер и создания графика.

val grapha = sc.textFile("graph.csv")
val getgdata = grapha.map(line=>line.split(","))  
val node1 = getgdata.map(line=>(line(3).toLong,(line(0)))).distinct  
val node2 = getgdata.map(line=>(line(4).toLong,(line(1)))).distinct 

// This is node list of a graph.
val nodes = node1.union(node2).distinct  

//This is edge list.
val routes = getgdata.map(line=> 
 (Edge(line(3).toLong,line(4).toLong,line(2)))).distinct    

// now create graph using Graph library
val graphx = Graph(nodes,routes)  

Теперь мне нужно увидеть список смежности для каждого узлаиз этого графика.Как я могу сделать это с помощью Scala?

1 Ответ

0 голосов
/ 01 августа 2018

Глядя на ваш код, я предполагаю, что ваш graph.csv выглядит следующим образом:

node_1, node_2, node_1_relation_node_2, 1, 2
node_1, node_3, node_1_relation_node_3, 1, 3
node_2, node_3, node_2_relation_node_3, 2, 3

Теперь вы можете прочитать это в СДР следующим образом:

val graphData = sc.textFile("graph.csv").map(line => line.split(","))

Теперь, чтобы создать свой график, вам нужны две вещи:

СДР вершин,

val verticesRdd = graphData.flatMap(line => List(
    (line(3), line(0)),
    (line(4), line(1))
  )).distinct

СДП ребер,

val edgesRdd = graphData.map(line => Edge(line(3), line(4), line(2))).distinct

Теперь вы можете создавать своиgraph as,

val graph = Graph(verticesRdd, edgesRdd)

Но если вам просто нужен список смежности, вы можете получить его просто из graphData, как показано ниже,

val adjacencyRdd = graphData
  .flatMap(line => {
    val v1 = line(3).toLong
    val v2 = line(4).toLong
    List(
      (v1, v2),
      (v2, v1)
    )
  )
  .aggregateByKey(Set.empty[Long])(
    { case (adjacencySet, vertexId) => adjacencySet + vertexId }
    { case (adjacencySet1, adjacencySet2) => adjacencySet1 ++ adjacencySet2 }
  )
  .map({ case (vertexId, adjacencySet) => (vertextId, adjacencySet.toList) })
...