Второе обновление для изменения rdd на строку
@ USML , в основном изменено Seq [String] на Row так что rdd может быть paralellized . это распределенная параллельная коллекция, которую необходимо сериализовать
val df2 = csvDf.rdd.map(process(_)).map(a => Row.fromSeq(a))
//df2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
// And we use dynamic Schema (e.g. same number of columns as csv
spark.createDataFrame(df2, schema = dynamicSchema).show(false)
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|X1 |X2 |X3 |
|Y1 |Y2 |Y3 |
+---+---+---+
Обновление измененного требования Пока вы читаете CSV,конечный вывод будет иметь те же числа столбцов в качестве вашего CSV, поскольку мы используем df.schema для создания фрейма данных после вызова процесс метод. Попробуйте это:
val df = spark.read.format("csv").load("data.csv")
val dynamicSchema = df.schema // This makes sure to prserve same number of columns
def process(line: Row) : Seq[String] = {
val list = new ArrayList[String]
for (i <- 0 to line.size-1) {
list.add(line.getString(i).toUpperCase)
}
list.asScala.toSeq
}
val df2 = df.rdd.map(process(_)).map(a => Row.fromSeq(a)) // df2 is actually an RDD // updated conversion to Row
val finalDf = spark.createDataFrame(df2, schema = dynamicSchema) // We use same schema
finalDf.show(false)
Содержимое файла =>
cat data.csv
a1,b1,c1,d1
a2,b2,c2,d2
Код =>
import org.apache.spark.sql.Row
val csvDf = spark.read.csv("data.csv")
csvDf.show(false)
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|a1 |b1 |c1 |d1 |
|a2 |b2 |c2 |d2 |
+---+---+---+---+
def process(cols: Row): Row = { Row("a", "b", "c","d") } // Check the Data Type
val df2 = csvDf.rdd.map(process(_)) // df2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
val finalDf = spark.createDataFrame(df2,schema = csvDf.schema)
finalDf.show(false)
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|a |b |c |d |
|a |b |c |d |
+---+---+---+---+
Очкик сведению Строка необходим тип данных для Карта Строка
Лучше иметь класс безопасности типа Отдых должен быть легким