Я пытаюсь работать с 2 картами и сокращать количество рабочих мест, то есть второе задание зависит от первого. На самом деле, я пытаюсь подсчитать рейтинг фильмов по первым заданиям mapreduce и отсортировать по вторым.
Но по некоторым причинам вторая карта и вторая задача сокращения не работают.
Мой первый картпер
public class MovieRatingsMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
@Override
public void map(LongWritable longWritable, Text text, Context context) {
String line = text.toString();
String[] words = line.split("\t");
context.write(new Text(words[1]), ONE);
}
}
Первый редуктор
public class MovieRatingsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context){
//System.out.println("movie rating reducer is working");
String word = key.toString();
int totalCount = 0;
for (IntWritable value : values) {
int count = value.get();
totalCount += count;
}
context.write(new Text(word), new IntWritable(totalCount));
}
}
Второй картограф
public class MovieRatingsSortMapper extends Mapper<Text, Text, IntWritable, Text> {
IntWritable frequency = new IntWritable();
@Override
public void map(Text key, Text value, Context context) {
System.out.println("Movie Rating sort Mapper is working");
int newVal = Integer.parseInt(value.toString());
frequency.set(newVal);
context.write(frequency, key);
}
}
Второй редуктор
public class MovieRatingsSortReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
Text word = new Text();
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) {
System.out.println("movie rating sort reducer is working");
for(Text value : values) {
word.set(value);
context.write(key, word);
}
}
}
И Драйвер, Основной метод
public class MovireRatingsSortDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Path inputDirPath = new Path("src/main/resources/input/movieratings/");
Path outputDirPath = new Path("src/main/resources/output/movieratings/temp/");
Path inputDirPath2 = new Path("src/main/resources/output/movieratings/temp/");
Path outputDirPath2 = new Path("src/main/resources/output/movieratings/");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:/");
conf.set("mapreduce.framework.name", "local");
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(outputDirPath, true);
fs.delete(outputDirPath2, true);
fs.setWriteChecksum(false);
JobControl jobControl = new JobControl("jobChain");
Configuration conf1 = getConf();
Job job1 = Job.getInstance(conf1);
job1.setJarByClass(MovireRatingsSortDriver.class);
job1.setJobName("MovieRatings");
FileInputFormat.addInputPath(job1, inputDirPath);
FileOutputFormat.setOutputPath(job1, outputDirPath);
job1.setMapperClass(MovieRatingsMapper.class);
job1.setCombinerClass(MovieRatingsReducer.class);
job1.setNumReduceTasks(1);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
controlledJob1.setJob(job1);
jobControl.addJob(controlledJob1);
Configuration conf2 = getConf();
Job job2 = Job.getInstance(conf2);
job2.setJarByClass(MovireRatingsSortDriver.class);
job2.setJobName("Sorter");
FileInputFormat.addInputPath(job2, inputDirPath2);
FileOutputFormat.setOutputPath(job2, outputDirPath2);
job2.setMapperClass(MovieRatingsSortMapper.class);
job2.setReducerClass(MovieRatingsSortReducer.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setNumReduceTasks(1);
ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
controlledJob2.setJob(job2);
// make job2 dependent on job1
controlledJob2.addDependingJob(controlledJob1);
// add the job to the job control
jobControl.addJob(controlledJob2);
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
while (!jobControl.allFinished()){
Thread.sleep(500);
}
jobControl.stop();
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MovireRatingsSortDriver(), args);
System.exit(exitCode);
}
}
Первый преобразователь и преобразователь работает хорошо и создает файл результатов в / movieratings / temp, но второй преобразователь никогда не запускается без ошибок или какой-либо информации.
Можете ли вы дать мне несколько идей, почему вторая работа не работает.
Сначала я подумал, что в первом редукторе я даю результат, но во втором маппере я получил входные данные как
Я изменился, но ничего не изменилось. Теперь у меня нет идеи.