Я новичок в Java-кодировании, получил исключение при вызове функции waitForCompletion из класса Job. Это исключение выдается при ожидании завершения задания mapreduce.
Отрывок журнала
INFO client.RMProxy: Connecting to ResourceManager at ip-xxx-xx-xx-xxx.ec2.internal/xxx-xx-xx-xxx:8032
WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO input.FileInputFormat: Total input paths to process : 5
INFO mapreduce.JobSubmitter: number of splits:5
INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546590586763_0111
INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1546590586763_0111
WARN server.InsertionServer: Exception.
org.apache.commons.lang.NotImplementedException: This method is implemented by ResourcePBImpl
at org.apache.hadoop.yarn.api.records.Resource.setMemorySize(Resource.java:111)
at org.apache.hadoop.mapred.YARNRunner.createApplicationSubmissionContext(YARNRunner.java:340)
at org.apache.hadoop.mapred.YARNRunner.submitJob(YARNRunner.java:296)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:240)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at InsertionServer.start_and_run_job(InsertionServer.java:201)
at InsertionServer.main(InsertionServer.java:344)
А вот фрагмент кода
public static void start_and_run_job() {
String inputFile = null, tmpFile = null;
Set<String> inputFileList = new HashSet<String>(50);
long fileSize = 0;
try {
FileStatus[] status = hdfsFs.listStatus(new Path(hdfsInputDir));
for(int i = 0 ; i < status.length ; i++){
tmpFile = status[i].getPath().getName();
int pos = tmpFile.indexOf(".ready");
if(pos == -1) {
continue;
}
fileSize += status[i].getLen();
inputFileList.add(tmpFile);
}
}catch(Exception e){
LOG.info("File not found", e);
}
Object[] inputList = inputFileList.toArray();
if(inputList.length == 0)
return;
try {
for(int i = 0 ; i < inputList.length ; i++){
if(!hdfsFs.exists(new Path(hdfsInputDir + "/" + inputList[i].toString()))) {
tmpFile = null;
return;
}
}
conf.set("hadoop.tmp.dir", baseDir);
Job job = new Job(conf, tableName);
job.setWorkingDirectory(new Path(baseDir));
job.setJobName(tableName);
job.setJarByClass(InsertionServer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
job.setMapperClass(InsertionServer.NewBulkLoadMap.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
for(int i = 0 ; i < inputList.length ; i++){
FileInputFormat.addInputPath(job, new Path(hdfsInputDir + "/" + inputList[i].toString()));
}
FileOutputFormat.setOutputPath(job, new Path(hdfsOutputDir));
HFileOutputFormat.configureIncrementalLoad(job, hTable);
job.waitForCompletion(true);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(hdfsOutputDir), hTable);
} catch (Exception e) {
LOG.warn("Exception. ", e);
}
}
Я пытался найти его для возможного решения, но не смог найти много помощи.
Может ли кто-нибудь помочь мне разрешить это исключение?
Заранее спасибо
Soumyadutta Das