Декартово произведение между вершинами GraphX - PullRequest
0 голосов
/ 03 мая 2018

Я хотел бы сделать декартово произведение между узлами Графа. Я хочу построить их матрицу расстояний. Возможно, это не очень хороший подход, поэтому любые предложения приветствуются.

Это мой код, и он не работает, у меня нет ни предупреждений, ни исключений, он просто не работает. Я думаю, может быть потому, что я пытаюсь сделать декартово произведение с таким же RDD, но я не знаю, как это исправить, как сделать вложенный цикл или что-то, что может помочь мне вычислить эта матрица.

val indexes1 = graph.vertices.map(_._1)
val indexes2 = graph.vertices.map(_._1)

val cartesian = indexes1.cartesian(indexes2).cache()
cartesian.map(pair => matrix.updated(pair._1, shortPathBetween(pair._1, pair._2)))

def shortPathBetween(v1:VertexId, v2:VertexId) : Int = {
    val path = ShortestPaths.run(graph, Seq(v2))
    val shortestPath = path.vertices.filter({case (vId, _ ) => vId == v1})
        .first()
        ._2
        .get(v2)

    shortestPath.getOrElse(-1)
}

1 Ответ

0 голосов
/ 03 мая 2018

Я бы подошел к этому, используя pregel API. Это позволяет параллельное прохождение графа от каждого узла. Если вы отслеживаете расстояния и обновляете их во время обхода с весом ребра, вы получаете вершины с расстояниями до каждой (достижимой) другой вершины.

Если вы, например, возьмете этот ориентированный граф:

directed graph

Вы можете инициировать это в Spark GraphX ​​следующим образом:

val graphData = List(
    (0, 0, 1, 10.0),
    (1, 0, 2, 5.0),
    (2, 1, 2, 2.0),
    (3, 1, 3, 1.0),
    (4, 2, 1, 3.0),
    (5, 2, 3, 9.0),
    (6, 2, 4, 2.0),
    (7, 3, 4, 4.0),
    (8, 4, 0, 7.0),
    (9, 4, 3, 5.0)
  ).toDF("id", "from", "to", "distance")

  val vertexRDD: RDD[(Long, Int)] = graphData.flatMap(_.getValuesMap[Int](List("to", "from")).values).distinct().map(i => (i.toLong, i)).rdd
  val edgeRDD: RDD[Edge[Double]] = graphData.map(x => Edge(x.getInt(1), x.getInt(2), x.getDouble(3))).rdd
  val graph: Graph[Int, Double] = Graph(vertexRDD, edgeRDD)

Для вызова pregel требуется 3 функции

  • vprog для инициализации каждой вершины сообщением (в этом случае пустая карта [VertexId, Double] для отслеживания расстояний)
  • sendMsg шаг обновления, который применяется к каждой итерации (в этом случае обновление расстояний путем добавления веса ребра и возврата Итератора с сообщениями для отправки на следующую итерацию
  • mergeMsg для объединения двух сообщений (2 Map [VertexId, Double] s в 1, сохраняя кратчайшее расстояние)

В коде это может выглядеть так:

def vprog(id: VertexId, orig: Map[VertexId, Double], newly: Map[VertexId, Double]): Map[VertexId, Double] = newly

def mergeMsg(a: Map[VertexId, Double], b: Map[VertexId, Double]): Map[VertexId, Double] = (a.toList ++ b.toList).groupBy(_._1).map{ // mapValues is not serializable :-(
    case (id, v) => id -> v.map(_._2).min // keep shortest distance in case of duplicate
}

def sendMsg(trip: EdgeTriplet[Map[VertexId, Double], Double]): Iterator[(VertexId, Map[VertexId, Double])] = {
    val w = trip.attr // weight of edge from src -> dst
    val distances = trip.dstAttr.mapValues(_ + w) + // update collected distances at dst + edge weight
      (trip.srcId -> 0.0, trip.dstId -> w) // set distance to src to 0  and to dst the edge weight

    // If src contains as much nodes as dst (we traversed all)
    if(trip.srcAttr.keySet.intersect(distances.keySet).size != distances.keySet.size)
      Iterator((trip.srcId, distances))
    else
      Iterator.empty
}

Затем запустите pregel, соберите вершины и поверните карту, чтобы получить матрицу расстояний.

val initMap = Map.empty[VertexId, Double]

val result = graph
    .mapVertices((_,_) => initMap)
    .pregel(
      initialMsg = initMap,
      activeDirection = EdgeDirection.Out
    )(vprog, sendMsg, mergeMsg)
    .vertices
    .toDF("id","map")
    .select('id, explode('map))
    .groupBy("id")
    .pivot("key")
    .agg(min("value"))
    .orderBy("id")
    .show(false)

Результат будет выглядеть как

+---+----+----+----+----+---+
|id |0   |1   |2   |3   |4  |
+---+----+----+----+----+---+
|0  |0.0 |8.0 |5.0 |11.0|7.0|
|1  |11.0|0.0 |2.0 |1.0 |4.0|
|2  |9.0 |3.0 |0.0 |4.0 |2.0|
|3  |11.0|21.0|16.0|0.0 |4.0|
|4  |7.0 |15.0|12.0|5.0 |0.0|
+---+----+----+----+----+---+

Возможно, есть другие / лучшие способы, но это кажется вычислительно менее интенсивным, чем вычисление кратчайшего пути между узлами как декартово произведение; -)

...