потеря данных при чтении файла огромного размера в spark scala - PullRequest
0 голосов
/ 16 марта 2020
val data = spark.read
    .text(filepath)
    .toDF("val")
    .withColumn("id", monotonically_increasing_id())
val count = data.count()

val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)

val columns = header
.replace("H|*|", "")
.replace("|##|", "")
.split("\\|\\*\\|")

val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))

var correctData = data.where('id > 1 && 'id < count-1).select("val")

val dataArr = correctData.rdd.map(s=> {
      val a = s.getString(0).replace("\\\n","").replace("\\\r","")
      var b = a.replace("|##|", "").split("\\|\\*\\|")

      while(b.length < columns.length) b = b :+ ""
      RowFactory.create(b:_*)
    })

val finalDF = spark.createDataFrame(dataArr,structSchema)

Этот код работает нормально, когда я читаю файл, содержащий до 50 000 строк +, но когда файл содержит строк больше, этот код начинает терять данные. Когда этот код читает файл, содержащий более 1 миллиона строк, окончательный счетчик данных дает только 65k + строк данных. Я не могу понять, где происходит проблема в этом коде и что нужно изменить в этом коде, чтобы он принимал все данные в конечном кадре данных. ps - самый высокий файл, который этот код будет принимать, имея почти 14 миллионов + строк, в настоящее время этот код принимает только 2 миллиона строк из них.

1 Ответ

1 голос
/ 16 марта 2020

Похоже, что связано с Как добавить постоянный столбец идентификаторов строк в Spark DataFrame?

т.е. избегать использования monotonically_increasing_id и следовать некоторым советам из этого потока.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...