Я пытаюсь добавить новый столбец в каждую строку DataFrame, как это
def addNamespace(iter: Iterator[Row]): Iterator[Row] = {
iter.map (row => {
println(row.getString(0))
// Row.fromSeq(row.toSeq ++ Array[String]("shared"))
val newseq = row.toSeq ++ Array[String]("shared")
Row(newseq: _*)
})
iter
}
def transformDf(source: DataFrame)(implicit spark: SparkSession): DataFrame = {
val newSchema = StructType(source.schema.fields ++ Array(StructField("namespace", StringType, nullable = true)))
val df = spark.sqlContext.createDataFrame(source.rdd.mapPartitions(addNamespace), newSchema)
df.show()
df
}
Но я продолжаю получать эту ошибку - Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
в строке df.show()
Можеткто-нибудь, пожалуйста, помогите разобраться в этом.Я искал в нескольких сообщениях, но все, что я пытался, дает мне эту ошибку.
Я также пытался val again = sourceDF.withColumn("namespace", functions.lit("shared"))
, но у него та же проблема.
Схема уже прочитанных данных
root
|-- name: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
| |-- activates_on: timestamp (nullable = true)
| |-- expires_on: timestamp (nullable = true)
| |-- created_by: string (nullable = true)
| |-- created_on: timestamp (nullable = true)
| |-- updated_by: string (nullable = true)
| |-- updated_on: timestamp (nullable = true)
| |-- properties: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)