Конвертировать RDD [String] во фрейм данных - PullRequest
0 голосов
/ 02 мая 2019

У меня есть RDD [String] с этой формой:

VAR1,VAR2,VAR3,VAR4, ...
  a ,  b ,  c ,  d , ...
  e ,  f ,  g ,  h , ...

Это означает, что первая строка - это разделенные запятыми заголовки, а все последующие строки - мои данные, также разделенные запятыми.

Моя цель - преобразовать этот неструктурированный RDD в DataFrame следующим образом:

_____________________
|VAR1|VAR2|VAR3|VAR4| 
|----|----|----|----|
|  a |  b |  c |  d | 
|  e |  f |  g |  h | 

Я попытался использовать метод toDF (), который преобразует RDD [tuples] в Dataframe.Но преобразование из RDD [String] в RDD [tuples] звучит нереально в отношении моего количества переменных (более 200).

Другим решением должно быть использование метода

sqlContext.createDataFrame(rdd, schema)

который требует преобразования моего RDD [String] в RDD [Row] и преобразования моего заголовка (первая строка RDD) в схему: StructType, но я не знаю, как создать эту схему.

Любое решение для преобразования RDD [String] в Dataframe с заголовком было бы очень хорошо.

Заранее спасибо.

Ответы [ 2 ]

3 голосов
/ 02 мая 2019

Вы также можете достичь этого результата примерно так:

val data = Seq(
  ("VAR1, VAR2, VAR3, VAR4"),
  ("a, b, c, d"),
  ("ae, f, g, h")
)

val dataDS = sc.parallelize(data).toDS
val result = spark.read.option("inferSchema","true").option("header","true").csv(dataDS)

result.printSchema

result.show

Вывод выше:

root
 |-- VAR1: string (nullable = true)
 |--  VAR2: string (nullable = true)
 |--  VAR3: string (nullable = true)
 |--  VAR4: string (nullable = true)

и

+----+-----+-----+-----+
|VAR1| VAR2| VAR3| VAR4|
+----+-----+-----+-----+
|   a|    b|    c|    d|
|  ae|    f|    g|    h|
+----+-----+-----+-----+

Если ваши данные содержали цифры в одном из столбцов (исключая заголовок), тогда «inferSchema» должна правильно выводить этот столбец как числовой тип. Например, используя это в качестве входных данных:

val data = Seq(
  ("VAR1, VAR2, VAR3, VAR4"),
  ("a,   1, c, d"),
  ("ae, 10, g, h")
)

Вывод будет:

root
 |-- VAR1: string (nullable = true)
 |--  VAR2: double (nullable = true)
 |--  VAR3: string (nullable = true)
 |--  VAR4: string (nullable = true)

и

+----+-----+-----+-----+
|VAR1| VAR2| VAR3| VAR4|
+----+-----+-----+-----+
|   a|  1.0|    c|    d|
|  ae| 10.0|    g|    h|
+----+-----+-----+-----+

Надеюсь, это поможет.

1 голос
/ 02 мая 2019

Схема может быть создана из первого ряда:

val data = Seq(
  ("VAR1, VAR2, VAR3, VAR4"),
  ("a, b, c, d"),
  ("ae, f, g, h")
)
val rdd = sparkContext.parallelize(data).map(_.split(","))

val firstRow = rdd.first()
val schemaFields = firstRow.map(n => StructField(n, StringType, true))

val remaining = rdd.zipWithIndex().filter(_._2 > 0).keys.map(v => Row(v.toSeq: _*))
val result = spark.createDataFrame(remaining, StructType(schemaFields))
result.show(false)

Вывод:

+----+-----+-----+-----+
|VAR1| VAR2| VAR3| VAR4|
+----+-----+-----+-----+
|a   | b   | c   | d   |
|ae  | f   | g   | h   |
+----+-----+-----+-----+
...