Чтобы создать GraphX
график, вам нужно извлечь вершины из вашего кадра данных и связать их с идентификаторами.Затем вам нужно извлечь ребра (2 кортежа вершин + метаданные), используя эти идентификаторы.И все, что должно быть в RDD, а не в фреймах данных.
Другими словами, вам нужно RDD[(VertexId, X)]
для вершин и RDD[Edge(VertexId, VertexId, Y)]
, где X
- метаданные вершины, а Y
- край.метаданные.Обратите внимание, что VertexId
это просто псевдоним для Long
.
В вашем случае, с "S" и "O" столбцами вершин и "P" столбцом ребер, это будет выглядеть следующим образом.
// Let's create the vertex RDD.
val vertices : RDD[(VertexId, String)] = df
.select(explode(array('S, 'O))) // S and O are the vertices
.distinct // we remove duplicates
.rdd.map(_.getAs[String](0)) // transform to RDD
.zipWithIndex // associate a long index to each vertex
.map(_.swap)
// Now let's define a vertex dataframe because joins are clearer in sparkSQL
val vertexDf = vertices.toDF("id", "node")
// And let's extract the edges and join their vertices with their respective IDs
val edges : RDD[Edge(VertexId, VertexId, String)] = df
.join(vertexDf, df("S") === vertexDf("node")) // getting the IDs for "S"
.select('P, 'O, 'id as 'idS)
.join(vertexDf, df("O") === vertexDf("node")) // getting the IDs for "O"
.rdd.map(row => // creating the edge using column "P" as metadata
Edge(row.getAs[Long]("idS"), row.getAs[Long]("id"), row.getAs[String]("P")))
// And finally
val graph = Graph(vertices, edges)