Класс Hadoop JobConf устарел, нужен обновленный пример - PullRequest
12 голосов
/ 22 декабря 2011

Я пишу программы для hadoop, и я действительно не хочу играть с устаревшими классами.В любом месте онлайн я не могу найти программы с обновленным

org.apache.hadoop.conf.Configuration

классом, состоящим из

org.apache.hadoop.mapred.JobConf

class.

   public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(Test.class);
     conf.setJobName("TESST");

     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);

     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class);
     conf.setReducerClass(Reduce.class);

     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
   }

Вот так выглядит мой main ().Может пожалуйста кто-нибудь предоставит мне обновленную функцию.

Ответы [ 2 ]

18 голосов
/ 22 декабря 2011

Вот классический пример WordCount.Вы заметите тон других импортеров, которые могут не понадобиться, прочитав код, который вы выясните, какой именно.

Чем отличается?Я использую интерфейс Tool и GenericOptionParser для разбора команды задания aka: hadoop jar ....

В маппере вы заметите вещь запуска.Вы можете избавиться от этого, он обычно вызывается по умолчанию, когда вы предоставляете код для метода Map.Я положил его туда, чтобы дать вам информацию, что вы можете дополнительно контролировать этап картирования.Это все с использованием нового API.Я надеюсь, что вы найдете это полезным.Любые другие вопросы, дайте мне знать!

import java.io.IOException;
import java.util.*;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.GenericOptionsParser;

public class Inception extends Configured implements Tool{

 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }

  public void run (Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
              map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        cleanup(context);
  }
 }

 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }

public int run(String[] args) throws Exception {

    Job job = Job.getInstance(new Configuration());

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setJarByClass(WordCount.class);

    job.submit();
    return 0;
    }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    ToolRunner.run(new WordCount(), otherArgs);
 }
}
1 голос
/ 31 марта 2015

Также возьмите классический WordCount в качестве примера:

org.apache.hadoop.mapred.JobConf старый, в новой версии мы используем Configuration и Job для достижения.

Пожалуйста, используйте org.apache.hadoop.mapreduce.lib.* (это новый API) вместо org.apache.hadoop.mapred.TextInputFormat (это старый).

Mapper и Reducer не являются чем-то новым, см. Функцию main, она включает в себя относительно общие конфигурации, не стесняйтесь изменять их в соответствии с вашими конкретными требованиями.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  private Text outputKey;
  private IntWritable outputVal;

  @Override
  public void setup(Context context) {
    outputKey = new Text();
    outputVal = new IntWritable(1);
  }

  @Override
  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer stk = new StringTokenizer(value.toString());
    while(stk.hasMoreTokens()) {
      outputKey.set(stk.nextToken());
      context.write(outputKey, outputVal);
    }
  }
}

class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result;

  @Override
  public void setup(Context context) {
    result = new IntWritable();
  }

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    for(IntWritable val: values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}

public class WordCount {
  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    if(args.length != 2) {
      System.err.println("Usage: <in> <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "Word Count");

    // set jar
    job.setJarByClass(WordCount.class);

    // set Mapper, Combiner, Reducer
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    /* Optional, set customer defined Partioner:
     * job.setPartitionerClass(MyPartioner.class);
     */

    // set output key
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // set input and output path
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // by default, Hadoop use TextInputFormat and TextOutputFormat
    // any customer defined input and output class must implement InputFormat/OutputFormat interface
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
...