Hadoop JobControl, вторая работа не работает - PullRequest
0 голосов
/ 14 марта 2019

Я пытаюсь работать с 2 картами и сокращать количество рабочих мест, то есть второе задание зависит от первого. На самом деле, я пытаюсь подсчитать рейтинг фильмов по первым заданиям mapreduce и отсортировать по вторым. Но по некоторым причинам вторая карта и вторая задача сокращения не работают.

Мой первый картпер

public class MovieRatingsMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private static final IntWritable ONE = new IntWritable(1);

    @Override
    public void map(LongWritable longWritable, Text text, Context context) {
        String line = text.toString();
        String[] words = line.split("\t");

        context.write(new Text(words[1]), ONE); 
    }
}

Первый редуктор

    public class MovieRatingsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context){
            //System.out.println("movie rating reducer is working");
            String word = key.toString();
            int totalCount = 0;
            for (IntWritable value : values) {
                int count = value.get();
                totalCount += count;
            }

            context.write(new Text(word), new IntWritable(totalCount));
        }
    }

Второй картограф

public class MovieRatingsSortMapper extends Mapper<Text, Text, IntWritable, Text> {

    IntWritable frequency = new IntWritable();

    @Override
    public void map(Text key, Text value, Context context) {

        System.out.println("Movie Rating sort Mapper is working");

        int newVal = Integer.parseInt(value.toString());
        frequency.set(newVal);

        context.write(frequency, key);
    }
}

Второй редуктор

public class MovieRatingsSortReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    Text word = new Text();

    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context) {

        System.out.println("movie rating sort reducer is working");

        for(Text value : values) {
            word.set(value);
            context.write(key, word);
        }
    }
}

И Драйвер, Основной метод

public class MovireRatingsSortDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        Path inputDirPath = new Path("src/main/resources/input/movieratings/");
        Path outputDirPath = new Path("src/main/resources/output/movieratings/temp/");

        Path inputDirPath2 = new Path("src/main/resources/output/movieratings/temp/");
        Path outputDirPath2 = new Path("src/main/resources/output/movieratings/");

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:/");
        conf.set("mapreduce.framework.name", "local");
        FileSystem fs = FileSystem.getLocal(conf);
        fs.delete(outputDirPath, true);
        fs.delete(outputDirPath2, true);
        fs.setWriteChecksum(false);

        JobControl jobControl = new JobControl("jobChain");
        Configuration conf1 = getConf();

        Job job1 = Job.getInstance(conf1);
        job1.setJarByClass(MovireRatingsSortDriver.class);
        job1.setJobName("MovieRatings");

        FileInputFormat.addInputPath(job1, inputDirPath);
        FileOutputFormat.setOutputPath(job1, outputDirPath);

        job1.setMapperClass(MovieRatingsMapper.class);
        job1.setCombinerClass(MovieRatingsReducer.class);
        job1.setNumReduceTasks(1);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);

        ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
        controlledJob1.setJob(job1);

        jobControl.addJob(controlledJob1);

        Configuration conf2 = getConf();

        Job job2 = Job.getInstance(conf2);
        job2.setJarByClass(MovireRatingsSortDriver.class);
        job2.setJobName("Sorter");

        FileInputFormat.addInputPath(job2, inputDirPath2);
        FileOutputFormat.setOutputPath(job2, outputDirPath2);

        job2.setMapperClass(MovieRatingsSortMapper.class);
        job2.setReducerClass(MovieRatingsSortReducer.class);

        job2.setOutputKeyClass(IntWritable.class);
        job2.setOutputValueClass(Text.class);
        job2.setInputFormatClass(KeyValueTextInputFormat.class);     

        job2.setNumReduceTasks(1);

        ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
        controlledJob2.setJob(job2);

        // make job2 dependent on job1
        controlledJob2.addDependingJob(controlledJob1);

        // add the job to the job control
        jobControl.addJob(controlledJob2);

        Thread jobControlThread = new Thread(jobControl);
        jobControlThread.start();

        while (!jobControl.allFinished()){
            Thread.sleep(500);
        }

        jobControl.stop();
        return 0;



    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MovireRatingsSortDriver(), args);
        System.exit(exitCode);

    }

}

Первый преобразователь и преобразователь работает хорошо и создает файл результатов в / movieratings / temp, но второй преобразователь никогда не запускается без ошибок или какой-либо информации.

Можете ли вы дать мне несколько идей, почему вторая работа не работает. Сначала я подумал, что в первом редукторе я даю результат, но во втором маппере я получил входные данные как Я изменился, но ничего не изменилось. Теперь у меня нет идеи.

...