Как получить доступ к context.write (значение ключа) в основной функции - PullRequest
0 голосов
/ 23 октября 2019

Во время фазы сокращения я пишу в контексте общее количество обработанных строк и, если операция уменьшения была успешной. Но внутри основной функции меня интересует агрегированное значение для всех строк из всех фаз сокращения. Например, если у меня есть 3 редуктора со значениями счетчика процессов как 3, 2, 5. В основной функции, если я могу получить доступ к этим значениям и агрегировать их как 10.

Пожалуйста, найдите соответствующий код ниже:

    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

      try {
        ....

        processRowCount.set(sum);
        success = true;
      } catch (Exception ex) {
      }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
      super.cleanup(context);
      outputStream.flush();
      outputStream.close();
      while(true) {
        if (channel.isClosed()) {
          break;
        }
        try{ Thread.sleep(1000); } catch(Exception e) {}
      }
      channel.disconnect();
      session.disconnect();
      context.write(new Text("successful"), new Text(String.valueOf(success)));
      context.write(new Text("row_count"), new Text(String.valueOf(processRowCount)));
    }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    CommandLine cmd = DistributedTsLoad.parseArgs(args, conf);
    DistributedLoad.fillHadoopConfiguration(cmd, conf);
    Job job = Job.getInstance(conf, "distributed-load");
    job.setJarByClass(DistributedLoad.class);
    job.setMapperClass(TsLoadRowMapper.class);
    job.setReducerClass(TsLoadRowReducer.class);
    job.setNumReduceTasks(conf.getInt("num-reducers", 1));
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    for(Option opt : cmd.getOptions()) {
      switch (opt.getLongOpt()) {
        case "input":
          for (String input : opt.getValues()) {
            Path inputPath = new Path(input);
            FileInputFormat.addInputPath(job, inputPath);
          }
          break;
        case "output":
          Path outputPath = new Path(opt.getValue());
          FileOutputFormat.setOutputPath(job, outputPath);
          break;
      }
    }
    boolean res = job.waitForCompletion(true);
    System.exit(res ? 0 : 1);
  }
...