Как установить "spark.hadoop.ignoreInputErrors" в true в pyspark? - PullRequest
0 голосов
/ 17 мая 2019

Я пытаюсь создать программу pyspark для подсчета количества строк, содержащих определенный шаблон из файлов gzip в hdfs.

Я пробовал код, упомянутый ниже, для одного файла, а также получил результат = 28451326. Но запуск кода более 20+ файлов вызывает проблему.

Обнаружено обсуждение здесь о том, как пропустить поврежденную часть, используя "spark.hadoop.ignoreInputErrors" и продолжить работу, но я не могу использовать ее в pyspark.

flies=sc.textFile("/path-to/my/NginxLogs/today/*_2019-05-16.access.log.gz")
flies.take(10)
rdd = flies.filter(lambda x: "daily" in x)
rdd.take(2)
rdd.count
.
.
.
.
[Stage 8:======================>                                   (7 + 4) / 18]19/05/17 10:28:13 WARN TaskSetManager: Lost task 6.3 in stage 8.0 (TID 21, HDP-slave-3, executor 1): java.io.EOFException: Unexpected end of input stream
        at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
        at java.io.InputStream.read(InputStream.java:101)
        at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:271)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:518)
        at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948)
        at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

Я ожидаю, что результатом будет количество всех строк, содержащих строку "daily". Может быть около 50000000.

...