Удалить вершины без исходящих ребер в GraphX - PullRequest
0 голосов
/ 15 мая 2018

У меня есть большой граф (несколько миллионов вершин и ребер).Я хочу удалить все вершины (и ребра), у которых нет исходящих ребер.У меня есть код, который работает, но он медленный, и мне нужно сделать это несколько раз.Я уверен, что могу использовать какой-нибудь существующий метод GraphX, чтобы сделать его намного быстрее.

Это код, который у меня есть.

val users: RDD[(VertexId, String)] = sc.parallelize(Array((1L, "1"), (2L, "2"), (3L, "3"), (4L, "4")))
  val relationships: RDD[Edge[Double]] = sc.parallelize(
    Array(
      Edge(1L, 3L, 500.0),
      Edge(3L, 2L, 400.0),
      Edge(2L, 1L, 600.0),
      Edge(3L, 1L, 200.0),
      Edge(2L, 4L, 200.0),
      Edge(3L, 4L, 500.0)
    ))

val graph = org.apache.spark.graphx.Graph(users, relationships)

val lst = graph.outDegrees.map(x => x._1).collect
var set:scala.collection.mutable.HashSet[Long] = new scala.collection.mutable.HashSet()
for(a<- lst) {set.add(a)}
var subg = graph.subgraph(vpred = (id, attr) => set.contains(id))
//since vertex 4 has no outgoing edges, subg.edges should return 4 and subg.vertices = 3 

Я не знаю, как еще этого можно достичь.Любая помощь приветствуется!

РЕДАКТИРОВАТЬ: Я мог бы сделать это с HashSet, но я думаю, что это все еще можно улучшить.

Ответы [ 4 ]

0 голосов
/ 30 мая 2018

Вы могли бы это, чтобы найти все вершины нулевой степени.

val zeroOutDeg = graph.filter(graph => {
   val degrees: VertexRDD[Int] = graph.outDegrees
   graph.outerJoinVertices(degrees) {(vid, data, deg => deg.getOrElse(0)}
   }, vpred = (vid: VertexId, deg:Int) => deg == 0)
0 голосов
/ 15 мая 2018

Вы можете напрямую определить другой граф с отфильтрованными вершинами. Примерно так:

val lst = graph.outDegrees.map(x => x._1).collect
var graph2 = Graph(graph.vertices.filter(v => lst.contains(v)), graph.edges)
0 голосов
/ 16 мая 2018

Если вы не хотите использовать подграф, вот еще один способ использования триплетов для поиска вершин назначения, которые также являются исходными вершинами.

val graph = org.apache.spark.graphx.Graph(users, relationships)
val AsSubjects = graph.triplets.map(triplet => (triplet.srcId,(triplet)))
val AsObjects = graph.triplets.map(triplet => (triplet.dstId,(triplet)))
val ObjectsJoinSubjects = AsObjects.join(AsSubjects)
val ObjectsJoinSubjectsDistinct = ObjectsJoinSubjects.mapValues(x => x._1).distinct()
val NewVertices = ObjectsJoinSubjectsDistinct.map(x => (x._2.srcId, x._2.srcAttr)).distinct()
val NewEdges = ObjectsJoinSubjectsDistinct.map(x => new Edge(x._2.srcId, x._2.dstId, x._2.attr))
val newgraph = Graph(NewVertices,NewEdges)

Я не уверен, обеспечивает ли это улучшение по сравнению с подграфом, потому что в моем решении используется метод Different (), который стоит дорого. Я проверил с графиком, который вы предоставили, и мое решение на самом деле занимает больше времени. Тем не менее, я чувствую, что это маленький пример. Поэтому я бы посоветовал вам протестировать диаграмму большего размера и сообщить нам, работает ли она лучше.

0 голосов
/ 15 мая 2018

Первая оптимизация вашего кода - иметь набор, а не массив, что сделало бы поиск O (1), а не O (n)

Но это не масштабируется, так как вы собираете все в драйвере, а затем отправляете его обратно исполнителям. Правильный способ - вызвать joinVertices с outDegrees и просто отобразить на исходный график.

...