Как преобразовать RDD [(String, Iterable [VertexId])] в DataFrame? - PullRequest
0 голосов
/ 08 февраля 2019

Я создал RDD из Graphx, который выглядит следующим образом:

val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices

val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
  val rand = randomUUID().toString
  val clusterList: Iterable[VertexId] = y.map(_._1)
  (rand, clusterList)
}

nodeGraph имеет тип RDD[(String, Iterable[VertexId])], и данные внутри будут иметь форму:

(abc-def11, Iterable(1,2,3,4)), 
(def-aaa, Iterable(10,11)), 
...

Что я хочу сейчас сделать, это создать из него фрейм данных, который должен выглядеть следующим образом:

col1        col2
abc-def11   1
abc-def11   2
abc-def11   3
abc-def11   4
def-aaa     10
def-aaa     11

Как это сделать в Spark?

1 Ответ

0 голосов
/ 08 февраля 2019

Сначала преобразуйте RDD в кадр данных, используя toDF() с нужными именами столбцов.Это проще всего сделать, изменив сначала Iterable[VertexId] на Seq[Long].

import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")

Обратите внимание, что это можно сделать при создании nodeGraph для сохранения шага.Затем с помощью функции explode выровняйте кадр данных,

val df2 = df.withColumn("col2", explode($"col2"))

, что даст вам желаемый результат.

...