Я полагаю, что в Spark / Scala можно написать множество версий решения для загрузки графика из указанного вами формата файла.
Вот пример динамического решения с использованием СДР:
// Loading sample data
scala> val graphData = sc.parallelize(Seq("1, 8, 9, 10", "2,5,6,7,3,1"))
graphData: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5]
// Trim whitespaces and map the String into an Array[Long]
scala> val graphList = graphData.map( x => {
| x.replace(" ", "").split(",").map(_.toLong)
| })
graphList: org.apache.spark.rdd.RDD[Array[Long]] = MapPartitionsRDD[6]
// Here is how graphList looks like now
scala> graphList.collect
res11: Array[Array[Long]] = Array(Array(1, 8, 9, 10), Array(2, 5, 6, 7, 3, 1))
// Generating edges by crossProduct element(0) with the rest of Array elements
scala> val edges = graphList.flatMap(x => x.drop(1).map(y => (x(0), y) )).map(x => Edge(x._1, x._2, "attr"))
edges: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,8,attr), Edge(1,9,attr), Edge(1,10,attr), Edge(2,5,attr), Edge(2,6,attr), Edge(2,7,attr), Edge(2,3,attr), Edge(2,1,attr))
// Generating vertices, and adding name/attr for each vertex
scala> val vertices = graphList.flatMap(x => x).map(x => (x, ("name", "attr"))).distinct.sortBy(x => x)
vertices: org.apache.spark.rdd.RDD[(Long, (String, String))] = MapPartitionsRDD
//A default value is defined in case a connection or vertex is missing; the graph is then constructed from the RDD-based structures vertices and edges and the default record:
val default = ("Unknown", "Missing")
// Finally, declare your Graph
scala> val graph = Graph(vertices, edgesRDD, default)
graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@8097e8f
// Checking how vertices look like
scala> graph.vertices.collect
res26: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((8,(name,attr)), (1,(name,attr)), (9,(name,attr)), (10,(name,attr)), (2,(name,attr)), (3,(name,attr)), (5,(name,attr)), (6,(name,attr)), (7,(name,attr)))
Примечание вам также следует рассмотреть возможность секционирования (для параллелизма) и кэширования (вершины, ребра) для дальнейшей оптимизации вашей работы.
Лучший способ построения графика
GraphFrames теперь является лучшей альтернативой GraphX, которая выигрывает от масштабируемости и высокой производительности DataFrames.
Я рекомендую вам прочитать об этом и начать использовать его, если это возможно.
Более собственный формат для представления графика для GraphX или GraphFrames
В качестве примера, здесь файл вершины содержит всего шесть строк. Каждая вершина представляет человека и имеет идентификационный номер вершины, имя и атрибуты, в данном случае значение возраста:
1,Mike,48
2,Sarah,45
3,John,25
4,Jim,53
5,Kate,22
6,Flo,52
Другой файл ребер содержит набор значений направленных ребер в идентификаторе исходной вершины формы, идентификаторе конечной вершины и отношении. Итак, запись 1 формирует отношения Сестры между Фло и Майком:
6,1,Sister
1,2,Husband
2,1,Wife
5,1,Daughter
5,2,Daughter
3,1,Son
3,2,Son
4,1,Friend
1,5,Father
1,3,Father
2,5,Mother
2,3,Mother
Теперь ваш код станет таким простым:
val vertex = spark.read.option("header","true").load("csvgraph1_vertex.csv")
val edges = spark.read.option("header","true").load("csvgraph1_edges.csv")
val graph = GraphFrame(vertex, edges)
Обновление
Интеграция GraphFrames с GraphX
GraphFrames полностью интегрируются с GraphX посредством преобразований между двумя представлениями без потери данных. Мы можем конвертировать наши графики в график GraphX и обратно в GraphFrame.
val gx: Graph[Row, Row] = g.toGraphX()
val g2: GraphFrame = GraphFrame.fromGraphX(gx)