Я использую коммиттер S3A (промежуточный коммиттер каталогов) со своим хранилищем объектов (не AWS), где я пытаюсь загрузить каталог большого размера (50 ТБ в пространстве), используя клиент Cloudera (hadoop). Он использует Map Reduce для загрузки каталога в мое хранилище объектов. S3A Stage Committer инициирует различные задачи для выполнения операций MultiPart Upload (MPU) в хранилище объектов, и все они фиксируются на этапе job_commit.
Проблема и вопрос:
Журналы MapReduce, как видно на клиенте hadoop
В моем случае все фиксации задачи завершены успешно, но на этапе S3A commit_commit job_commit it fails
и ошибка, которую я вижу,
INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=FAILED. Redirecting to the job history server
INFO mapreduce.Job: map 0% reduce 100%
INFO mapreduce.Job: Job job_1553199983818_0003 failed with state FAILED due to Job commit from a prior MRAppMaster attempt is potentially in progress. Preventing multiple commit executions
-> Are these a warning only?
, поскольку ни одна из моих задач не была выполнена на RM
S3A регистрирует ошибки коммиттера на этапе job_commit, и он начинает удалять файлы. Я не мог понять, почему job_commit не удалось из-под журналов
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter is org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Attempt num: 2 is last retry: true because a commit was started.
INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType for class org.apache.hadoop.mapreduce.v2.app.MRAppMaster$NoopEventHandler
INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.jobhistory.EventType for class org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler
INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator$EventType for class org.apache.hadoop.mapreduce.v2.app.MRAppMaster$ContainerAllocatorRouter
[main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler: Emitting job history data to the timeline server is not enabled
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Not attempting to recover. Recovery is not supported by class org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter. Use an OutputCommitter that supports recovery.
INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Previous history file is at hdfs://xxxxxx:8020/user/xxxxxx/.staging/job_1553199983818_0003/job_1553199983818_0003_1.jhist
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Starting to clean up previous job's temporary files
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Task committer attempt_1553199983818_0003_m_000000_0: aborting job job_1553199983818_0003 in state FAILED
INFO [main] org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter: Starting: Task committer attempt_1553199983818_0003_m_000000_0: aborting job in state job_1553199983818_0003
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Task committer attempt_1553199983818_0003_m_000000_0: no pending commits to abort
INFO [main] org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter: Task committer attempt_1553199983818_0003_m_000000_0: aborting job in state job_1553199983818_0003 : duration 0:00.007s
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Starting: Cleanup job job_1553199983818_0003
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Starting: Aborting all pending commits under s3a://xxxxxx/user/xxxxxx/30000
Журналы пряжи мало что объясняют, почему сбой S3 job_committer. Любое понимание того, что я могу посмотреть, чтобы понять это.
try {
0309 String user = UserGroupInformation.getCurrentUser().getShortUserName();
0310 Path stagingDir = MRApps.getStagingAreaDir(conf, user);
0311 FileSystem fs = getFileSystem(conf);
0312
0313 boolean stagingExists = fs.exists(stagingDir);
0314 Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
0315 boolean commitStarted = fs.exists(startCommitFile);
0316 Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
0317 boolean commitSuccess = fs.exists(endCommitSuccessFile);
0318 Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
0319 boolean commitFailure = fs.exists(endCommitFailureFile);
0320 if(!stagingExists) {
0321 isLastAMRetry = true;
0322 LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
0323 " is last retry: " + isLastAMRetry +
0324 " because the staging dir doesn't exist.");
0325 errorHappenedShutDown = true;
0326 forcedState = JobStateInternal.ERROR;
0327 shutDownMessage = "Staging dir does not exist " + stagingDir;
0328 LOG.fatal(shutDownMessage);
0329 } else if (commitStarted) {
0330 //A commit was started so this is the last time, we just need to know
0331 // what result we will use to notify, and how we will unregister
0332 errorHappenedShutDown = true;
0333 isLastAMRetry = true;
0334 LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
0335 " is last retry: " + isLastAMRetry +
0336 " because a commit was started.");
0337 copyHistory = true;
0338 if (commitSuccess) {
0339 shutDownMessage =
0340 "Job commit succeeded in a prior MRAppMaster attempt " +
0341 "before it crashed. Recovering.";
0342 forcedState = JobStateInternal.SUCCEEDED;
0343 } else if (commitFailure) {
0344 shutDownMessage =
0345 "Job commit failed in a prior MRAppMaster attempt " +
0346 "before it crashed. Not retrying.";
0347 forcedState = JobStateInternal.FAILED;
0348 } else {
0349 if (isCommitJobRepeatable()) {
0350 // cleanup previous half done commits if committer supports
0351 // repeatable job commit.
0352 errorHappenedShutDown = false;
0353 cleanupInterruptedCommit(conf, fs, startCommitFile);
0354 } else {
0355 //The commit is still pending, commit error
0356 shutDownMessage =
0357 "Job commit from a prior MRAppMaster attempt is " +
0358 "potentially in progress. Preventing multiple commit executions";
0359 forcedState = JobStateInternal.ERROR;
0360 }
0361 }
0362 }
0363 } catch (IOException e) {
0364 throw new YarnRuntimeException("Error while initializing", e);
0365 }