Ошибка коммиттера S3 на этапе job_commit в MRAppMaster - PullRequest
0 голосов
/ 29 марта 2019

Я использую коммиттер S3A (промежуточный коммиттер каталогов) со своим хранилищем объектов (не AWS), где я пытаюсь загрузить каталог большого размера (50 ТБ в пространстве), используя клиент Cloudera (hadoop). Он использует Map Reduce для загрузки каталога в мое хранилище объектов. S3A Stage Committer инициирует различные задачи для выполнения операций MultiPart Upload (MPU) в хранилище объектов, и все они фиксируются на этапе job_commit.

Проблема и вопрос:

  1. Журналы MapReduce, как видно на клиенте hadoop
    В моем случае все фиксации задачи завершены успешно, но на этапе S3A commit_commit job_commit it fails и ошибка, которую я вижу, INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=FAILED. Redirecting to the job history server INFO mapreduce.Job: map 0% reduce 100% INFO mapreduce.Job: Job job_1553199983818_0003 failed with state FAILED due to Job commit from a prior MRAppMaster attempt is potentially in progress. Preventing multiple commit executions -> Are these a warning only?, поскольку ни одна из моих задач не была выполнена на RM

  2. S3A регистрирует ошибки коммиттера на этапе job_commit, и он начинает удалять файлы. Я не мог понять, почему job_commit не удалось из-под журналов

INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter is org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Attempt num: 2 is last retry: true because a commit was started.
INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType for class org.apache.hadoop.mapreduce.v2.app.MRAppMaster$NoopEventHandler
INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.jobhistory.EventType for class org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler
INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator$EventType for class org.apache.hadoop.mapreduce.v2.app.MRAppMaster$ContainerAllocatorRouter
[main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler: Emitting job history data to the timeline server is not enabled
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Not attempting to recover. Recovery is not supported by class org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter. Use an OutputCommitter that supports recovery.
INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Previous history file is at hdfs://xxxxxx:8020/user/xxxxxx/.staging/job_1553199983818_0003/job_1553199983818_0003_1.jhist
INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Starting to clean up previous job's temporary files
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Task committer attempt_1553199983818_0003_m_000000_0: aborting job job_1553199983818_0003 in state FAILED
INFO [main] org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter: Starting: Task committer attempt_1553199983818_0003_m_000000_0: aborting job in state job_1553199983818_0003
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Task committer attempt_1553199983818_0003_m_000000_0: no pending commits to abort
INFO [main] org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter: Task committer attempt_1553199983818_0003_m_000000_0: aborting job in state job_1553199983818_0003 : duration 0:00.007s
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Starting: Cleanup job job_1553199983818_0003
INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Starting: Aborting all pending commits under s3a://xxxxxx/user/xxxxxx/30000

Журналы пряжи мало что объясняют, почему сбой S3 job_committer. Любое понимание того, что я могу посмотреть, чтобы понять это.

try {
0309       String user = UserGroupInformation.getCurrentUser().getShortUserName();
0310       Path stagingDir = MRApps.getStagingAreaDir(conf, user);
0311       FileSystem fs = getFileSystem(conf);
0312 
0313       boolean stagingExists = fs.exists(stagingDir);
0314       Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
0315       boolean commitStarted = fs.exists(startCommitFile);
0316       Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
0317       boolean commitSuccess = fs.exists(endCommitSuccessFile);
0318       Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
0319       boolean commitFailure = fs.exists(endCommitFailureFile);
0320       if(!stagingExists) {
0321         isLastAMRetry = true;
0322         LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
0323             " is last retry: " + isLastAMRetry +
0324             " because the staging dir doesn't exist.");
0325         errorHappenedShutDown = true;
0326         forcedState = JobStateInternal.ERROR;
0327         shutDownMessage = "Staging dir does not exist " + stagingDir;
0328         LOG.fatal(shutDownMessage);
0329       } else if (commitStarted) {
0330         //A commit was started so this is the last time, we just need to know
0331         // what result we will use to notify, and how we will unregister
0332         errorHappenedShutDown = true;
0333         isLastAMRetry = true;
0334         LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
0335             " is last retry: " + isLastAMRetry +
0336             " because a commit was started.");
0337         copyHistory = true;
0338         if (commitSuccess) {
0339           shutDownMessage =
0340               "Job commit succeeded in a prior MRAppMaster attempt " +
0341               "before it crashed. Recovering.";
0342           forcedState = JobStateInternal.SUCCEEDED;
0343         } else if (commitFailure) {
0344           shutDownMessage =
0345               "Job commit failed in a prior MRAppMaster attempt " +
0346               "before it crashed. Not retrying.";
0347           forcedState = JobStateInternal.FAILED;
0348         } else {
0349           if (isCommitJobRepeatable()) {
0350             // cleanup previous half done commits if committer supports
0351             // repeatable job commit.
0352             errorHappenedShutDown = false;
0353             cleanupInterruptedCommit(conf, fs, startCommitFile);
0354           } else {
0355             //The commit is still pending, commit error
0356             shutDownMessage =
0357                 "Job commit from a prior MRAppMaster attempt is " +
0358                 "potentially in progress. Preventing multiple commit executions";
0359             forcedState = JobStateInternal.ERROR;
0360           }
0361         }
0362       }
0363     } catch (IOException e) {
0364       throw new YarnRuntimeException("Error while initializing", e);
0365     }
...