val data = spark.read
.text(filePath)
.toDF("val")
.withColumn("id", monotonically_increasing_id())
val count = data.count()
val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
val columns = header
.replace("H|*|", "")
.replace("|##|", "")
.split("\\|\\*\\|")
val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))
var correctData = data.where('id > 1 && 'id < count-1).select("val")
var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
})
val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)
display(finalDF)
Эта часть кода дает ошибку:
Исключение в потоке "dispatcher-event-l oop -0" java .lang.OutOfMemoryError: Java пространство кучи
После нескольких часов отладки, в основном, детали:
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
})
val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)
, вызвавшей ошибку.
Я изменил деталь как
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
}).toList
val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)
Но ошибка остается такой же. Что мне следует изменить, чтобы избежать этого?
Когда я запускал этот код, это спарк-кластер блоков данных, определенное задание выдает эту ошибку драйвера Spark:
Задание прервано из-за сбоя этапа: Задача с сериализацией 45: 0 было 792585456 байт, что превышает максимально допустимое значение: spark.rp c .message.maxSize (268435456 байт).
Я добавил эту часть кода:
spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)
, но бесполезно.