Spark создает java.io.IOException: не удалось переименовать при сохранении part-xxxxx.gz. - PullRequest
0 голосов
/ 26 июня 2018

Новый пользователь Spark здесь. Я извлекаю функции из многих изображений .tif, хранящихся на AWS S3, каждое с идентификатором, например 02_R4_C7. Я использую Spark 2.2.1 и hadoop 2.7.2.

Я использую все конфигурации по умолчанию, например:

conf = SparkConf().setAppName("Feature Extraction")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

А вот вызов функции, который завершается неудачно после успешного сохранения некоторых функций в папке идентификатора изображения в виде файлов part-xxxx.gz:

features_labels_rdd.saveAsTextFile(text_rdd_direct,"org.apache.hadoop.io.compress.GzipCodec")

См. Ошибку ниже. Когда я удаляю файлы компонента part-xxxx.gz, которые были успешно созданы, и повторно запускаю сценарий, происходит сбой на другом изображении и файле part-xxxxx.gz, по-видимому, недетерминированным способом. Я проверяю, чтобы удалить все функции перед повторным запуском. Моя теория состоит в том, что два рабочих пытаются создать один и тот же временный файл и конфликтуют друг с другом, поскольку для одного и того же файла существуют два одинаковых сообщения об ошибках, но с интервалом в одну секунду.

Я в недоумении, что с этим делать, я видел списки свечей конфигурации , которые могут изменить то, как искра обрабатывает задачи, но я не уверен, что здесь поможет, так как я не не понимаю проблему, которую я имею. Любая помощь с благодарностью!

SLF4J: Class path contains multiple SLF4J bindings.
*SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j- 
log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:24:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:24:41 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
 Feature file of 02_R4_C7 is created                                            
[Stage 3:=================>                                       (6 + 14) / 20]18/06/26 19:24:58 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626192453_0003_m_000007_59
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
    at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
    at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
    at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[Stage 3:=====================================>                   (13 + 7) / 20]18/06/26 19:24:58 ERROR executor.Executor: Exception in task 7.0 in stage 3.0 (TID 59)
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
    at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
    at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
    at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/06/26 19:24:58 ERROR scheduler.TaskSetManager: Task 7 in stage 3.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "run_feature_extraction_spark.py", line 88, in <module>
    main(sc)
  File "run_feature_extraction_spark.py", line 75, in main
    features_labels_rdd.saveAsTextFile(text_rdd_direct, "org.apache.hadoop.io.compress.GzipCodec")
  File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 1551, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 3.0 failed 1 times, most recent failure: Lost task 7.0 in stage 3.0 (TID 59, localhost, executor driver): java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz*

И когда я запускаю его снова, скрипт делает его дальше, но завершается неудачно с той же ошибкой с другой папкой изображений и файлом part-xxxx.gz

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:37:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:37:24 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512

 Feature file of 02_R4_C7 is created                                            
 Feature file of 02_R4_C6 is created                                            
 Feature file of 02_R4_C5 is created                                            
 Feature file of 02_R4_C4 is created                                            
 Feature file of 02_R4_C3 is created                                            
 Feature file of 02_R4_C2 is created                                            
 Feature file of 02_R4_C1 is created                                            
[Stage 15:==========================================>             (15 + 5) / 20]18/06/26 19:38:16 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626193811_0015_m_000017_285
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/_temporary/0/_temporary/attempt_20180626193811_0015_m_000017_285/part-00017.gz; isDirectory=false; length=896020; replication=1; blocksize=67108864; modification_time=1530041897000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/part-00017.gz

Ответы [ 2 ]

0 голосов
/ 06 июня 2019

Решения в посте @Steve Loughran великолепны. Просто добавить небольшую информацию, чтобы помочь объяснить проблему.

Hadoop-2.7 использует протокол фиксации Hadoop для фиксации. Когда Spark сохраняет результат в S3, он сначала сначала сохраняет временный результат в S3 и делает его видимым, переименовывая его при успешном выполнении задания (причину и детали можно найти в этом замечательном документе ). Однако S3 является хранилищем объектов и не имеет реального «переименования»; это копирование данных в целевой объект, затем удаление исходного объекта.

S3 «в конечном итоге непротиворечивый», что означает, что операция delete может произойти до полной синхронизации copy . Когда это произойдет, переименование не удастся.

В моих случаях это срабатывало только на некоторых цепочечных заданиях. Я не видел это в простом спасении.

0 голосов
/ 26 июня 2018

Не безопасно использовать S3 в качестве прямого места назначения работы без «уровня согласованности» (Consistent EMR или самого проекта Apache Hadoop, S3Guard) или специального выходного коммиттера, специально разработанного для работы с S3 (Hadoop 3.1). + "коммитеры S3A"). Переименование - это то, где что-то не так, поскольку перечисление несоответствий означает, что при сканировании копируемых файлов могут отсутствовать данные или обнаруживаются удаленные файлы, которые невозможно переименовать. Ваша трассировка стека выглядит именно так, как я ожидал бы, чтобы это всплыло на поверхность: рабочие коммиты, по-видимому, не выполняются случайным образом.

Вместо того, чтобы вдаваться в подробности, вот видео Райан Блю на тему

Обходной путь: напишите в локальный кластер FS, затем используйте distcp для загрузки на S3.

PS: для Hadoop 2.7+ переключитесь на разъем s3a: //. У него точно такая же проблема согласованности без включенного S3Guard, но более высокая производительность.

...