Как исправить «SparkException: не удалось выполнить задачу при записи строк», когда в AWS EMR есть много файлов для записи - PullRequest
0 голосов
/ 23 мая 2019

Я пишу приложение Spark для обработки (в основном, фильтрации полезных веб-страниц для дальнейшего исследования) около 7 ТБ данных, предлагаемых Common Crawl в AWS EMR, а затем записываю отфильтрованные веб-страницы в сжатые текстовые файлы (приблизительно 300 ГБ),Исходный набор данных содержит 56000 субфайлов и приблизительно 100 МБ для каждого сжатого субфайла.Я экспериментировал, используя только часть данных (около 500 подфайлов), и результат, кажется, выглядит хорошо (успешно завершено).Но проблема в том, что когда я запускаю приложение для всего набора данных, я всегда получаю сообщение об ошибке « Файл уже существует ».Кстати, я использовал 30 машин c4.8xlarge (размер тома EBS корневого устройства 20 ГБ) с одним из них в качестве мастера.

Я искал эту ошибку в Интернете, она говорит, что

Задача искры может быть не выполнена по другой причине.И, наконец, выдает это «IOException: Файл уже существует» после повторных попыток исходной ошибки * ».

Поэтому я попытался найти основную причину. Есть ошибка типа« Задача не выполнена, покаписать строки", что, я думаю, может быть причиной, но я не могу найти какое-либо решение в Интернете. Я боролся за несколько дней, и это стоило бы мне довольно много долларов, если я попробую каждое решение. Я бы действительноСпасибо, если кто-то может помочь мне с этой проблемой.

Вот код:

object App {

    def main(args: Array[String]): Unit = {

        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

        val conf = new SparkConf()

        val spark = SparkSession
        .builder
        .appName("coocurrence")
        .config("spark.master", "local")
        .config(conf)
        .getOrCreate()

        import spark.implicits._
        val sc = spark.sparkContext

        val startTime = System.nanoTime

        // Read dataset on s3.
        // val inputFilePath = "s3://commoncrawl/crawl-data/CC-MAIN-2019-18/segments/*/wet/*warc.wet.gz"
        val inputFilePath = "s3://zhaoyin1/resources/*"

        // var inputFilePath = "resources/*.gz" // Read dataset at local.
        val cf = new Configuration
        cf.set("textinputformat.record.delimiter", "WARC/1.0")
        val pattern = Pattern.compile("^(\\P{sc=Han}*\\p{sc=Han}){10}.*$", Pattern.DOTALL);
        val pattern1 = Pattern.compile("^(\\P{sc=Hiragana}*\\p{sc=Hiragana}).*$", Pattern.DOTALL);
        val pattern2 = Pattern.compile("^(\\P{sc=Hangul}*\\p{sc=Hangul}).*$", Pattern.DOTALL);
        val pattern3 = Pattern.compile("^(\\P{sc=Greek}*\\p{sc=Greek}).*$", Pattern.DOTALL);
        val pattern4 = Pattern.compile("^(\\P{sc=Cyrillic}*\\p{sc=Cyrillic}).*$", Pattern.DOTALL);

        // 结巴自定义词库(包含中国的市、区、县,并去掉"市"、"区"、"县"关键词以便分词)
        // val dir = "resources/dict/"
        // val filename = "resources/dict/customized_words.dict"
        // val spamfile = "resources/dict/spam_freq.dict"
        // val gambfile = "resources/dict/gamb.dict"
        val dir = "s3://zhaoyin1/dict/"
        val filename = "s3://zhaoyin1/dict/customized_words.dict"
        val spamfile = "s3://zhaoyin1/dict/spam_freq.dict"
        val gambfile = "s3://zhaoyin1/dict/gamb.dict"
        val dirPath = Paths.get(dir)
        WordDictionary.getInstance().loadUserDict(dirPath)

        // val lines = Source.fromFile(filename).getLines()
        val lines = sc.textFile(filename).collect()
        val city_names = (for (line <- lines) yield line.split(" ")(0)).toArray
        val lines1 = sc.textFile(spamfile).collect()
        val spam_names = (for (line <- lines1) yield line.split(" ")(0)).toArray
        val lines2 = sc.textFile(gambfile).collect()
        val gamb_names = (for (line <- lines2) yield line.split(" ")(0)).toArray

        // try {
            val webPages = sc.newAPIHadoopFile(
                    inputFilePath,
                    classOf[TextInputFormat],
                    classOf[LongWritable],
                    classOf[Text], cf)
                .map(x => x._2.toString)
                .filter(pattern.matcher(_).matches)
                .filter(!pattern1.matcher(_).matches)
                .filter(!pattern2.matcher(_).matches)
                .filter(!pattern3.matcher(_).matches)
                .filter(!pattern4.matcher(_).matches)
                .map(_.replace("\n", ""))
                .filter{x =>
                    var str = if (x.length > 0)
                    new JiebaSegmenter().sentenceProcess(x)
                    var strSet = str.toString
                        .replace("[", "")
                        .replace("]", "")
                        .replace(" ", "")
                        .split(",").toSet

                    var exist = false
                    var spam = false
                    var gamb = false
                    var target = ""

                    breakable {
                        for (spam_name <- spam_names) {
                            if (strSet(spam_name)) {
                                spam = true
                                break
                            }
                        }
                        if (!spam) {
                            for (gamb_name <- gamb_names) {
                                if (strSet(gamb_name)) {
                                    gamb = true
                                    break
                                }
                            }
                            if (!gamb) {
                                for (city <- city_names) {
                                    if (strSet(city)) {
                                        exist = true
                                        break
                                    }
                                }
                            }
                        }
                    }
                    !spam && !gamb && exist
                }
        // }

        var dfWebPage = webPages.toDF()

        dfWebPage.printSchema()

        // dfWebPage.write.option("maxRecordsPerFile", 50000).format("text")
        //  .option("compression", "gzip")
        //  .mode("overwrite").save("s3://zhaoyin1/output/out")

        val duration = (System.nanoTime - startTime) / 1e9d
        println(f"Time elapsed: $duration%10.5f s")

        sc.stop()
    }
}

Вот файл журнала от steps/step-name/stderr.gz:

19/05/22 13:45:14 INFO Client: 
     client token: N/A
     diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
    at App$.main(App.scala:145)
    at App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1782 in stage 3.0 failed 4 times, most recent failure: Lost task 1782.3 in stage 3.0 (TID 2115, ip-172-31-65-55.ec2.internal, executor 25): org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://zhaoyin1/output/out/part-01782-807a6367-a6e8-4373-bb1a-4aebcc6b0601-c000.txt.gz
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30)
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:212)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
    at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(TextFileFormat.scala:151)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anon$1.newInstance(TextFileFormat.scala:84)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Вот лог от containers/.../stderr.gz (мастер):

19/05/22 13:38:51 WARN TaskSetManager: Lost task 1782.0 in stage 3.0 (TID 1788, ip-172-31-69-181.ec2.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
    at java.lang.Character.getType(Character.java:6924)
    at java.lang.Character$UnicodeScript.of(Character.java:4479)
    at java.util.regex.Pattern$Script.isSatisfiedBy(Pattern.java:3881)
    at java.util.regex.Pattern$CharProperty$1.isSatisfiedBy(Pattern.java:3773)
    at java.util.regex.Pattern$CharProperty.match(Pattern.java:3778)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4252)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
    at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)

Вот лог от containers/.../stderr.gz (раб):

19/05/22 13:37:14 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/05/22 13:37:14 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/05/22 13:37:14 INFO DirectFileOutputCommitter: Direct Write: ENABLED
19/05/22 13:37:14 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
19/05/22 13:37:14 INFO S3NativeFileSystem: Opening 's3://commoncrawl/crawl-data/CC-MAIN-2019-18/segments/1555578517682.16/wet/CC-MAIN-20190418141430-20190418163430-00219.warc.wet.gz' for reading
19/05/22 13:37:14 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/05/22 13:37:14 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/05/22 13:37:14 INFO DirectFileOutputCommitter: Direct Write: ENABLED
19/05/22 13:37:14 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1032.0 in stage 3.0 (TID 1038), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1015.0 in stage 3.0 (TID 1021), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1030.0 in stage 3.0 (TID 1036), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1028.0 in stage 3.0 (TID 1034), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1059.0 in stage 3.0 (TID 1065), reason: Stage cancelled
19/05/22 13:39:04 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
    at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
...