Во время фазы сокращения я пишу в контексте общее количество обработанных строк и, если операция уменьшения была успешной. Но внутри основной функции меня интересует агрегированное значение для всех строк из всех фаз сокращения. Например, если у меня есть 3 редуктора со значениями счетчика процессов как 3, 2, 5. В основной функции, если я могу получить доступ к этим значениям и агрегировать их как 10.
Пожалуйста, найдите соответствующий код ниже:
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
try {
....
processRowCount.set(sum);
success = true;
} catch (Exception ex) {
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
outputStream.flush();
outputStream.close();
while(true) {
if (channel.isClosed()) {
break;
}
try{ Thread.sleep(1000); } catch(Exception e) {}
}
channel.disconnect();
session.disconnect();
context.write(new Text("successful"), new Text(String.valueOf(success)));
context.write(new Text("row_count"), new Text(String.valueOf(processRowCount)));
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
CommandLine cmd = DistributedTsLoad.parseArgs(args, conf);
DistributedLoad.fillHadoopConfiguration(cmd, conf);
Job job = Job.getInstance(conf, "distributed-load");
job.setJarByClass(DistributedLoad.class);
job.setMapperClass(TsLoadRowMapper.class);
job.setReducerClass(TsLoadRowReducer.class);
job.setNumReduceTasks(conf.getInt("num-reducers", 1));
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for(Option opt : cmd.getOptions()) {
switch (opt.getLongOpt()) {
case "input":
for (String input : opt.getValues()) {
Path inputPath = new Path(input);
FileInputFormat.addInputPath(job, inputPath);
}
break;
case "output":
Path outputPath = new Path(opt.getValue());
FileOutputFormat.setOutputPath(job, outputPath);
break;
}
}
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}