Это ниже отлично работает:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
Эта структура:
rddWithZipId: org.apache.spark.rdd.RDD[((String, Int, Array[String]), Long)] = ZippedWithIndexRDD[149] at zipWithIndex at command-2467674133341972:32
выдает ошибку, когда я делаю то же самое, что и в предыдущем примере.Единственное отличие - массив [String].
Произошла ошибка:
notebook:45: error: value toSeq is not a member of (String, Int, Array[String])
val dfPosts = spark.createDataFrame(rddWithZipId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
Оглядываясь по сторонам, я не понимаю, почему это не сработает.Я отмечаю, что, хотя я вижу 4 типа / элемента в СДР, я могу заметить, что строка рассматривается как 1 структура.
Есть идеи?Возможны и другие способы, но я не понимаю, почему первый пример работает, а второй нет?Похоже, виноват Array [String].На самом деле так и должно быть, но как обойти это?
Просто сделав это:
val dfPosts = rddWithZipId.toDF()
возвращает вложенную схему, как показано ниже, так что, возможно, дело в этом, но тем не менее вопрос остается таким же, как указано выше.Вложенная структура означает, что я могу достичь того, что хочу, но это не проблема.
root
|-- _1: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: integer (nullable = false)
| |-- _3: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- _2: long (nullable = false)
Я думаю, что вложенную структуру нужно как-то определить.
Заранее спасибо.