SPARK Scala Ошибка при преобразовании RDD с массивом [String] в DF через createDataFrame - PullRequest
0 голосов
/ 30 сентября 2018

Это ниже отлично работает:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

Эта структура:

rddWithZipId: org.apache.spark.rdd.RDD[((String, Int, Array[String]), Long)] = ZippedWithIndexRDD[149] at zipWithIndex at command-2467674133341972:32

выдает ошибку, когда я делаю то же самое, что и в предыдущем примере.Единственное отличие - массив [String].

Произошла ошибка:

notebook:45: error: value toSeq is not a member of (String, Int, Array[String])
val dfPosts =  spark.createDataFrame(rddWithZipId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

Оглядываясь по сторонам, я не понимаю, почему это не сработает.Я отмечаю, что, хотя я вижу 4 типа / элемента в СДР, я могу заметить, что строка рассматривается как 1 структура.

Есть идеи?Возможны и другие способы, но я не понимаю, почему первый пример работает, а второй нет?Похоже, виноват Array [String].На самом деле так и должно быть, но как обойти это?

Просто сделав это:

val dfPosts =  rddWithZipId.toDF()

возвращает вложенную схему, как показано ниже, так что, возможно, дело в этом, но тем не менее вопрос остается таким же, как указано выше.Вложенная структура означает, что я могу достичь того, что хочу, но это не проблема.

root
  |-- _1: struct (nullable = true)
  |    |-- _1: string (nullable = true)
  |    |-- _2: integer (nullable = false)
  |    |-- _3: array (nullable = true)
  |    |    |-- element: string (containsNull = true)
  |-- _2: long (nullable = false)

Я думаю, что вложенную структуру нужно как-то определить.

Заранее спасибо.

1 Ответ

0 голосов
/ 30 сентября 2018

Возможны и другие способы, но я не могу понять, почему первый пример работает, а второй нет?Похоже, виноват массив [String].

Он не имеет ничего общего с содержимым первого элемента.Это все о типах.Если взглянуть на типы rddWithId - это

RDD[(Row, Long)]

, а вторая структура -

RDD[((String, Int, Array[String]), Long)]

Так, в первом случае _1 - это org.apache.spark.sql.Row, что обеспечивает toSeq метод, тогда как во втором случае _1 равен Tuple3[_, _, _], который не обеспечивает такой метод.

Если вы хотите, чтобы это работало

Row.fromSeq(row.toSeq ++ Array(index))

, замените

Row.fromSeq(("a", 1, Array("foo")).productIterator.toSeq ++ Array(index))

или лучше (зачем инициализировать дополнительные Array для каждого вызова?)

Row.fromSeq(("a", 1, Array("foo")).productIterator.toSeq :+ index)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...