Как разобрать большой текстовый файл в dataframe и записать в паркет с помощью Spark - PullRequest
0 голосов
/ 11 декабря 2018

В настоящее время я пытаюсь проанализировать большой текстовый файл, содержащий один столбец с именем «value», и хочу разделить его на несколько столбцов с фиксированной шириной на столбец.

Ввод выглядит следующим образом:

value
0000000206001060010001020020722E5549.73177802...
0000000206001060010001120030203E5549.73177802...
0000000206001060010001220030717E5549.73177802..
0000000206001060010001320040714E5549.73177802...
0000000206001060010001420050713E5549.73177802...
0000000206001060010001520060514E5549.73177802...
...

Я попытался сделать это следующим образом:

val dfRawInput = spark.read.text("C:/input.txt")
val dfInputColumnized = {
    dfRawInput.select(
      $"value".substr(1,8).cast("string").alias("column1"),
      $"value".substr(9,5).cast("string").alias("column2"),
      ...
      $"value".substr(x,y).cast("string").alias("columnZ"),
}
dfInputColumnized.write.parquet("C:/dfInputColumnized")

Однако это дает мне следующую ошибку

2018-12-11 13: 26: 31.331ОШИБКА --- [rker для задачи 7] org.apache.spark.util.Utils: прерывание задачи

java.lang.ArrayIndexOutOfBoundsException: 63 в org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte (UTF8String.java:191) в org.apache.spark.unsafe.types.UTF8String.numChars (UTF8String.java:206) в org.apache.spark.unsafe.types.UTF8String.substringSQL (UTF8String.java:309 at).apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.processNext (неизвестный источник) в org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java:43) в org.apache.execution.WholeStageCodegenExec $$ anonfun $ 10 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala: 614) в scala.collection.Iterator $$ anon $ 11.hasNext (Iterator.scala: 409) в org.apache.spark.sql.execution.datasources.FileFormatWriter $ SingleDirectoryWriteTas.execute (FileFormatWriter.scala: 380) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ org $ apache $ spark $ sql $ execute $ dasasources $ FileFormatWriter $$ executeTask $ 3.apply (FileFormatWriter.scala:269) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ org $ apache $ spark $ sql $ исполнительный $ источники данных $ FileFormatWriter $$ executeTask $ 3.apply (FileFormatWriter.scala: 267) в org.apache.spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks (Utils.scala: 1411) по адресу org.apache.spark.sql.execution.datasources.FileFormatWriter $ .org $ apache $ spark $ sql $ выполнение $ источники данных (FileFormatWritk $ execute).scala: 272) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 197) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 196) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.as.выполнить (Task.scala: 109) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 345) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748)

2018-12-11 13: 26: 33.323 ОШИБКА ---[rker для задачи 7] oassedatasources.FileFormatWriter: задание job_20181211132630_0000 прервано.2018-12-11 13: 26: 33.334 ОШИБКА --- [rker для задачи 7] org.apache.spark.executor.Executor: Исключение в задаче 7.0 на этапе 0.0 (TID 7)

org.apache.spark.SparkException: задача не выполнилась при записи строк.

Я предполагаю, что это из-за плохих записей, поэтому я попытался перехватить это так:

val dfRawInput = spark.read.text("C:/input.txt")
val dfInputColumnized = {
        dfRawInput
          .withColumn("column1",when(length($"value")>=lit(8),$"value".substr(1,8)).otherwise(null))
          .withColumn("column2",when(length($"value")>=lit(13),$"value".substr(9,5)).otherwise(null))
          ...
          .withColumn("columnZ",when(length($"value")>=lit(x+y-1),$"value".substr(x,y)).otherwise(null))
}
dfInputColumnized.write.parquet("C:/dfInputColumnized")

Ошибкатеперь читает:

java.lang.ArrayIndexOutOfBoundsException: null

2018-12-11 13: 30: 45.904 ОШИБКА --- [ker для задачи 24] oassedatasources.FileFormatWriter: Jobjob_20181211104644_0006 прерван.2018-12-11 13: 30: 45.904 ОШИБКА --- [ker для задачи 24] org.apache.spark.executor.Executor: Исключение в задаче 7.0 на этапе 6.0 (TID 24)

org.apache.spark.SparkException: задача не выполнена при записи строк.в org.apache.spark.sql.execution.datasources.FileFormatWriter $ .org $ apache $ spark $ sql $ исполнительный $ источники данных $ FileFormatWriter $$ executeTask (FileFormatWriter.scala: 285) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 197) вorg.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 196) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) по адресу.apache.spark.scheduler.Task.run (Task.scala: 109) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 345) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748). Причина: java.lang.ArrayIndexOutOfBoundsException: null * 31*

Это действительно из-за поврежденных строк?
Есть ли способ пропустить тех, кто использует Spark?
Есть ли лучший способ сделать это?(Я не обязательно связан со Spark)

...