Я записываю фрейм данных spark (2.4.4) в S3 и получаю sporadi c java .io.IOException: Не удалось переименовать ошибки S3AFileStatus. Spark отделен от установленной версии oop 3.1.3.
Ошибка известна и вызвана возможной последовательностью S3, которая означает возможную немедленную несогласованность, например, она описана в HAD OOP -14161 .
Однако я использую S3guard, и подробные журналы отладки показывают, что он работает. Я получаю все записи в таблице DynamoDB, метрики DynamoDB показывают чтение и запись, информация s3guard показывает, что она включена и т. Д. c.
Я понимаю, что файловый коммиттер по умолчанию не идеален, но не должен Я избегаю проблемы, потому что S3guard гарантирует немедленную согласованность?
Другими словами, почему я все еще получаю ошибку, несмотря на S3guard? Заранее спасибо за ваше время!
WARN org.apache.spark.scheduler.TaskSetManager - Lost task 49.0 in stage 936.0 (TID 72604, 10.0.62.191, executor 0): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to rename S3AFileStatus{path=s3a://my-spark-bucket/persisted_data/00/c9d61ba03f114d5f937eb8bdbe9ac2bc__9d6ba987-06e1-4e66-be88-f518ede26a4c_/rtg.parquet/_temporary/0/_temporary/attempt_20200123150921_0936_m_000049_72604/part-00049-f9ef3dcb-0928-466d-a663-39f023f401b8-c000.snappy.parquet; isDirectory=false; length=180872; replication=1; blocksize=33554432; modification_time=1579792215000; access_time=0; owner=root; group=root; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=true; isErasureCoded=false} isEmptyDirectory=FALSE to s3a://my-spark-bucket/persisted_data/00/c9d61ba03f114d5f937eb8bdbe9ac2bc__9d6ba987-06e1-4e66-be88-f518ede26a4c__Export/rtg.parquet/part-00049-f9ef3dcb-0928-466d-a663-39f023f401b8-c000.snappy.parquet
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:475)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:488)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:605)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:568)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
... 10 more