val data = spark.read
.text(filepath)
.toDF("val")
.withColumn("id", monotonically_increasing_id())
val count = data.count()
val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
val columns = header
.replace("H|*|", "")
.replace("|##|", "")
.split("\\|\\*\\|")
val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))
var correctData = data.where('id > 1 && 'id < count-1).select("val")
val dataArr = correctData.rdd.map(s=> {
val a = s.getString(0).replace("\\\n","").replace("\\\r","")
var b = a.replace("|##|", "").split("\\|\\*\\|")
while(b.length < columns.length) b = b :+ ""
RowFactory.create(b:_*)
})
val finalDF = spark.createDataFrame(dataArr,structSchema)
Этот код работает нормально, когда я читаю файл, содержащий до 50 000 строк +, но когда файл содержит строк больше, этот код начинает терять данные. Когда этот код читает файл, содержащий более 1 миллиона строк, окончательный счетчик данных дает только 65k + строк данных. Я не могу понять, где происходит проблема в этом коде и что нужно изменить в этом коде, чтобы он принимал все данные в конечном кадре данных. ps - самый высокий файл, который этот код будет принимать, имея почти 14 миллионов + строк, в настоящее время этот код принимает только 2 миллиона строк из них.