Код работает нормально, когда я пишу это:
def getData(filePath: String, recordSeperator:String , columnSeperator:String): DataFrame = {
val data = spark.read
.text(filePath)
.toDF("val")
.withColumn("id", monotonically_increasing_id())
val count = data.count()
val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
val columns = header
.replace("H|*|", "")
.replace(recordSeperator, "")
.split(columnSeperator)
import spark.implicits._
val correctData = data.where('id > 1 && 'id < count - 1)
val splitIntoCols = correctData.rdd.map(s=>{
val arr = s.getString(0).replace("|##|", "").split("\\|\\*\\|")
//val arr = s.getString(0)
//val arr1 = arr.replace(recordSeperator, "").split(columnSeperator)
//println(arr1)
RowFactory.create(arr:_*)
})
val struct = StructType(columns.map(s=>StructField(s, StringType, true)))
val finalDF = spark.createDataFrame(splitIntoCols,struct)
return finalDF
}
Но выдает ошибку, когда я пишу это: -
def getData(filePath: String, recordSeperator:String , columnSeperator:String): DataFrame = {
val data = spark.read
.text(filePath)
.toDF("val")
.withColumn("id", monotonically_increasing_id())
val count = data.count()
val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
val columns = header
.replace("H|*|", "")
.replace(recordSeperator, "")
.split(columnSeperator)
import spark.implicits._
val correctData = data.where('id > 1 && 'id < count - 1)
val splitIntoCols = correctData.rdd.map(s=>{
//val arr = s.getString(0).replace("|##|", "").split("\\|\\*\\|")
val arr = s.getString(0)
val arr1 = arr.replace(recordSeperator, "").split(columnSeperator)
RowFactory.create(arr1:_*)
})
val struct = StructType(columns.map(s=>StructField(s, StringType, true)))
val finalDF = spark.createDataFrame(splitIntoCols,struct)
return finalDF
}
Здесь recordSeperator - "| ## |" и columnSeperator - "\ | \ * \ |" ОШИБКА: "stackTrace": "org. apache .spark.SparkException: задача не сериализуема, в org. apache .spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner. scala: 403) org. apache .spark.util.ClosureCleaner $ .org $ апач $ искра $ Util $ ClosureCleaner $$ чистого (ClosureCleaner scala:. 393)