Невозможно создать df с указанной схемой - PullRequest
0 голосов
/ 14 мая 2019

Когда я пытаюсь создать фрейм данных со схемой в приведенном ниже коде, он не работает, и если это было сделано без схемы, каждый столбец данных забивается в один столбец

#transformations
val t3 = t1.map{case(a)=>(a(1).toInt,a(2))}.reduceByKey((x,y)=> `  
(x+","+y)).map{case(a,b)=>parse(a,b)}  

Функция анализавозвращает массив [Int].

Код показать здесь

`t3.collect()`  
res7: Array[Array[Int]] = Array(Array(100, 1, 1, 0, 0, 0, 2), Array(104,  
2, 0, 0, 0, 1, 3))  
#schema column names
`temp`  
res11: List[String] = List(id, review, inprogress, notstarted, completed,   
started, total)  

`val fields = temp.map(fieldName => StructField(fieldName,   
IntegerType, nullable = true))`  
fields: List[org.apache.spark.sql.types.StructField]  
#creating schema
`val schema = StructType(fields)`  
org.apache.spark.sql.types.StructType  

`val df = t3.toDF()`  
org.apache.spark.sql.DataFrame = [value: array<int>]  

`df.show()`  
+--------------------+  
|               value|  
+--------------------+  
|[100, 1, 1, 0, 0,...|  
|[104, 2, 0, 0, 0,...|  
+--------------------+  

`val df = t3.toDF(schema)`  
error: type mismatch;  


`val df = spark.createDataFrame(t3)`  
<console>:35: error: overloaded method value createDataFrame with   
alternatives  

Expected:  
+---+---------+----------+----------+------+-------+-----+  
| id|completed|inprogress|notstarted|review|started|total|  
+---+---------+----------+----------+------+-------+-----+  
|100|        0|         1|         0|     1|      0|    2|  
|104|        0|         0|         0|     2|      1|    3|  
+---------+---+----------+----------+------+-------+-----+  

Ответы [ 2 ]

0 голосов
/ 16 мая 2019

RDD [Array [Int]] с проанализированными данными можно преобразовать в RDD [Row] и затем преобразовать в DataFrame:

val parsedData = Array(Array(100, 1, 1, 0, 0, 0, 2), Array(104,
  2, 0, 0, 0, 1, 3))
val rddAfterParsing = sparkContext.parallelize(parsedData)
val rddOfRows = rddAfterParsing.map(arr => Row(arr: _*))

val columnNames = Seq("id", "review", "inprogress", "notstarted", "completed", "started", "total")
val fields = columnNames.map(fieldName => StructField(fieldName,
  IntegerType, nullable = true))
val result = spark.createDataFrame(rddOfRows, StructType(fields))

result.show(false)

Выход:

+---+------+----------+----------+---------+-------+-----+
|id |review|inprogress|notstarted|completed|started|total|
+---+------+----------+----------+---------+-------+-----+
|100|1     |1         |0         |0        |0      |2    |
|104|2     |0         |0         |0        |1      |3    |
+---+------+----------+----------+---------+-------+-----+
0 голосов
/ 14 мая 2019

из документации на искру у вас есть:

def toDF(colNames: String*): DataFrame

Но вы передаете StructType экземпляр функции toDF.

Вы можете создать свой второй Dataframe с помощью t3.toDF(temp:_*) (который преобразуется в toDF("id",.., "total")

Furtermore, вы должны использовать Array[(Int,..,Int)] вместо Array[Array[Int]]

...