Обрабатывать текстовый файл с помощью Spark - PullRequest
3 голосов
/ 08 июня 2019

Мне нужно прочитать текстовый файл в набор данных [T] в Spark.Файл не отформатирован должным образом, так как в нем есть несколько пустых полей, и трудно определить параметр для разделения строки.Я пытался прочитать данные в RDD, а затем преобразовать их в тип класса дела, однако не все поля анализируются должным образом, и я получаю сообщение об ошибке:

java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
        at java.lang.Double.parseDouble(Double.java:538)
        at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
        at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
        at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
        at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
        at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Что я могу сделать, чтобы обработать этофайл правильно?Мой файл .txt выглядит так (анимированные случайные данные, но формат тот же):

NEW50752085  84.0485 -76.3851  85.1   THE NAME OF AN OBJECT                       
DEM00752631  51.9581 -85.3315  98.5   THE NAME OF AN OBJECT                                  
KI004867205  40.8518  15.9351 276.5   THE NAME OF AN OBJECT           FHG   41196

Я пытался обработать его таким образом:

    val dataRdd = spark.sparkContext
      .textFile("file.txt")

    val dataArray = dataRdd
      .map(_.split(" "))

  case class caseClass(
    c1: String,
    c2: Double,
    c3: Double,
    c4: Double,
    c5: String,
    c6: String,
    c7: String
  )

    val df = dataArray
      .map(record => (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), record(5), record(6)))
      .map{case (c1, c2, c3, c4, c5, c6, c7) => CaseClass(c1, c2, c3, c4, c5, c6, c7)
      }.toDF()

Ответы [ 2 ]

1 голос
/ 08 июня 2019

Я собираюсь сделать некоторые предположения в этом ответе, которые могут быть неверными, но я полагаю, что они верны на основе предоставленных вами данных и предоставленных ошибок.

  • Предположение 1: Ваши данныеразделенный пробелами, несколькими пробелами.Я пришел к этому предположению на основе вашего NumberFormatException пустых строк, которые вы предоставили.Если бы ваш файл был разделен вкладками, мы бы не столкнулись с этим.
  • Предположение 2 (Это для моего собственного ума, но может быть и не так): Каждый элемент данных разделяется пробеломтакое же количество пробелов.В оставшейся части этого ответа я буду считать, что количество пробелов равно четырем.Если это предположение не так, это становится гораздо более сложной проблемой.
  • Предположение 3: только последние 2 из 7 элементов данных являются необязательными и иногда не отображаются.

Исключение NumberFormatException вызвано разделением на один пробел.Предположим, что следующая строка разделена пробелами:

NEW50752085    84.0485    -76.3851    85.1    THE NAME OF AN OBJECT 

Когда вы разбиваете на один пробел, эта строка преобразуется в следующий массив:

Array(NEW50752085, "", "", "", 84.0485, "", "", "", -76.3851, "", "", "", 85.1, "", "", "", THE, NAME, OF, AN, OBJECT)

Второй элемент этого массива,которая является пустой строкой, это то, что вы пытаетесь преобразовать в Double.Это то, что дает вам NumberFormatException для пустой строки.

.map(_.split("    "))

Когда вы изменяете это, чтобы разделить на 4 пробела (основываясь на моем предположении, которое может или не может быть правдой), вы получаете следующее:

Array(NEW50752085, 84.0485, -76.3851, 85.1, THE NAME OF AN OBJECT)

Но теперь мы столкнулись с другой проблемой - в ней всего пять элементов!Мы хотим семь.

Мы можем изменить это, изменив ваш более поздний код:

val df = dataArray.map(record => {
  (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), 
  if(record.size > 5) record(5) else "",
  if(record.size > 6) record(6) else "")
}).map{case (c1, c2, c3, c4, c5, c6, c7) => caseClass(c1, c2, c3, c4, c5, c6, c7)}.toDF
df.show
+-----------+-------+--------+----+--------------------+---+-----+
|         c1|     c2|      c3|  c4|                  c5| c6|   c7|
+-----------+-------+--------+----+--------------------+---+-----+
|NEW50752085|84.0485|-76.3851|85.1|THE NAME OF AN OB...|   |     |
|DEM00752631|51.9581|-85.3315|98.5|THE NAME OF AN OB...|   |     |
|KI004867205|40.8518| 15.9351|76.5|THE NAME OF AN OB...|FHG|41196|
+-----------+-------+--------+----+--------------------+---+-----+

Опять же, этот подход будет работать, только если все элементы ограничены одинаковым количеством пробелов.

0 голосов
/ 08 июня 2019

Если ваши данные не имеют закрытого формата, читаемого с помощью spark, единственный вариант, который у вас есть, - это создать свой собственный ридер, используя FileInputFormat

Таким образом, вы сможете определить поток разрешения для каждой строки ваших данных, чтобы определить, как разбивать и справляться с крайними случаями.

Лучший способ погрузиться в это на собственном примере. Это довольно солидный: https://www.ae.be/blog-en/ingesting-data-spark-using-custom-hadoop-fileinputformat/

...