Я пишу приложение Spark для обработки (в основном, фильтрации полезных веб-страниц для дальнейшего исследования) около 7 ТБ данных, предлагаемых Common Crawl в AWS EMR, а затем записываю отфильтрованные веб-страницы в сжатые текстовые файлы (приблизительно 300 ГБ),Исходный набор данных содержит 56000 субфайлов и приблизительно 100 МБ для каждого сжатого субфайла.Я экспериментировал, используя только часть данных (около 500 подфайлов), и результат, кажется, выглядит хорошо (успешно завершено).Но проблема в том, что когда я запускаю приложение для всего набора данных, я всегда получаю сообщение об ошибке « Файл уже существует ».Кстати, я использовал 30 машин c4.8xlarge (размер тома EBS корневого устройства 20 ГБ) с одним из них в качестве мастера.
Я искал эту ошибку в Интернете, она говорит, что
Задача искры может быть не выполнена по другой причине.И, наконец, выдает это «IOException: Файл уже существует» после повторных попыток исходной ошибки * ».
Поэтому я попытался найти основную причину. Есть ошибка типа« Задача не выполнена, покаписать строки", что, я думаю, может быть причиной, но я не могу найти какое-либо решение в Интернете. Я боролся за несколько дней, и это стоило бы мне довольно много долларов, если я попробую каждое решение. Я бы действительноСпасибо, если кто-то может помочь мне с этой проблемой.
Вот код:
object App {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val conf = new SparkConf()
val spark = SparkSession
.builder
.appName("coocurrence")
.config("spark.master", "local")
.config(conf)
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val startTime = System.nanoTime
// Read dataset on s3.
// val inputFilePath = "s3://commoncrawl/crawl-data/CC-MAIN-2019-18/segments/*/wet/*warc.wet.gz"
val inputFilePath = "s3://zhaoyin1/resources/*"
// var inputFilePath = "resources/*.gz" // Read dataset at local.
val cf = new Configuration
cf.set("textinputformat.record.delimiter", "WARC/1.0")
val pattern = Pattern.compile("^(\\P{sc=Han}*\\p{sc=Han}){10}.*$", Pattern.DOTALL);
val pattern1 = Pattern.compile("^(\\P{sc=Hiragana}*\\p{sc=Hiragana}).*$", Pattern.DOTALL);
val pattern2 = Pattern.compile("^(\\P{sc=Hangul}*\\p{sc=Hangul}).*$", Pattern.DOTALL);
val pattern3 = Pattern.compile("^(\\P{sc=Greek}*\\p{sc=Greek}).*$", Pattern.DOTALL);
val pattern4 = Pattern.compile("^(\\P{sc=Cyrillic}*\\p{sc=Cyrillic}).*$", Pattern.DOTALL);
// 结巴自定义词库(包含中国的市、区、县,并去掉"市"、"区"、"县"关键词以便分词)
// val dir = "resources/dict/"
// val filename = "resources/dict/customized_words.dict"
// val spamfile = "resources/dict/spam_freq.dict"
// val gambfile = "resources/dict/gamb.dict"
val dir = "s3://zhaoyin1/dict/"
val filename = "s3://zhaoyin1/dict/customized_words.dict"
val spamfile = "s3://zhaoyin1/dict/spam_freq.dict"
val gambfile = "s3://zhaoyin1/dict/gamb.dict"
val dirPath = Paths.get(dir)
WordDictionary.getInstance().loadUserDict(dirPath)
// val lines = Source.fromFile(filename).getLines()
val lines = sc.textFile(filename).collect()
val city_names = (for (line <- lines) yield line.split(" ")(0)).toArray
val lines1 = sc.textFile(spamfile).collect()
val spam_names = (for (line <- lines1) yield line.split(" ")(0)).toArray
val lines2 = sc.textFile(gambfile).collect()
val gamb_names = (for (line <- lines2) yield line.split(" ")(0)).toArray
// try {
val webPages = sc.newAPIHadoopFile(
inputFilePath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text], cf)
.map(x => x._2.toString)
.filter(pattern.matcher(_).matches)
.filter(!pattern1.matcher(_).matches)
.filter(!pattern2.matcher(_).matches)
.filter(!pattern3.matcher(_).matches)
.filter(!pattern4.matcher(_).matches)
.map(_.replace("\n", ""))
.filter{x =>
var str = if (x.length > 0)
new JiebaSegmenter().sentenceProcess(x)
var strSet = str.toString
.replace("[", "")
.replace("]", "")
.replace(" ", "")
.split(",").toSet
var exist = false
var spam = false
var gamb = false
var target = ""
breakable {
for (spam_name <- spam_names) {
if (strSet(spam_name)) {
spam = true
break
}
}
if (!spam) {
for (gamb_name <- gamb_names) {
if (strSet(gamb_name)) {
gamb = true
break
}
}
if (!gamb) {
for (city <- city_names) {
if (strSet(city)) {
exist = true
break
}
}
}
}
}
!spam && !gamb && exist
}
// }
var dfWebPage = webPages.toDF()
dfWebPage.printSchema()
// dfWebPage.write.option("maxRecordsPerFile", 50000).format("text")
// .option("compression", "gzip")
// .mode("overwrite").save("s3://zhaoyin1/output/out")
val duration = (System.nanoTime - startTime) / 1e9d
println(f"Time elapsed: $duration%10.5f s")
sc.stop()
}
}
Вот файл журнала от steps/step-name/stderr.gz
:
19/05/22 13:45:14 INFO Client:
client token: N/A
diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at App$.main(App.scala:145)
at App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1782 in stage 3.0 failed 4 times, most recent failure: Lost task 1782.3 in stage 3.0 (TID 2115, ip-172-31-65-55.ec2.internal, executor 25): org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://zhaoyin1/output/out/part-01782-807a6367-a6e8-4373-bb1a-4aebcc6b0601-c000.txt.gz
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30)
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:212)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(TextFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anon$1.newInstance(TextFileFormat.scala:84)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Вот лог от containers/.../stderr.gz
(мастер):
19/05/22 13:38:51 WARN TaskSetManager: Lost task 1782.0 in stage 3.0 (TID 1788, ip-172-31-69-181.ec2.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
at java.lang.Character.getType(Character.java:6924)
at java.lang.Character$UnicodeScript.of(Character.java:4479)
at java.util.regex.Pattern$Script.isSatisfiedBy(Pattern.java:3881)
at java.util.regex.Pattern$CharProperty$1.isSatisfiedBy(Pattern.java:3773)
at java.util.regex.Pattern$CharProperty.match(Pattern.java:3778)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4252)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
at java.util.regex.Pattern$Curly.match0(Pattern.java:4265)
Вот лог от containers/.../stderr.gz
(раб):
19/05/22 13:37:14 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/05/22 13:37:14 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/05/22 13:37:14 INFO DirectFileOutputCommitter: Direct Write: ENABLED
19/05/22 13:37:14 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
19/05/22 13:37:14 INFO S3NativeFileSystem: Opening 's3://commoncrawl/crawl-data/CC-MAIN-2019-18/segments/1555578517682.16/wet/CC-MAIN-20190418141430-20190418163430-00219.warc.wet.gz' for reading
19/05/22 13:37:14 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
19/05/22 13:37:14 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
19/05/22 13:37:14 INFO DirectFileOutputCommitter: Direct Write: ENABLED
19/05/22 13:37:14 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1032.0 in stage 3.0 (TID 1038), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1015.0 in stage 3.0 (TID 1021), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1030.0 in stage 3.0 (TID 1036), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1028.0 in stage 3.0 (TID 1034), reason: Stage cancelled
19/05/22 13:39:04 INFO Executor: Executor is trying to kill task 1059.0 in stage 3.0 (TID 1065), reason: Stage cancelled
19/05/22 13:39:04 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)