Я работаю в mapreduce в Hadoop. Все работает нормально, но в выходном файле ничего не написано. Я использовал приведенный ниже код для запуска задания.
Я не могу найти, в чем проблема. Почему он не пишет в вывод? Это для анализа отзывов Amazon. Работа, которая подсчитывает за каждый месяц топ-5 продуктов с самым высоким средним баллом.
Вывод должен включать ProduceId
и среднюю оценку, отсортированную по времени.
public class TopHighestScore {
private static final int TOP_K = 5;
/**
* Returns a month with the ReviewWritable object
(= productID and the rating)
* @author fabrizio
*
*/
public static class Mapper1 extends
Mapper<LongWritable,Text,Text,ReviewWritable> {
private static ReviewWritable REVIEW = new ReviewWritable();
@Override
public void map(LongWritable key, Text value,
Context ctx)
throws IOException, InterruptedException {
String[] cols = (value.toString()).split("\t");
//check data correctness
if(cols!=null && cols.length==10){
try {
String prodID =
cols[AmazonFoodReviewsColumns.PROD_ID];
int score =
Integer.parseInt(cols[AmazonFoodReviewsColumns.SCORE]);
long date =
Long.parseLong(cols[AmazonFoodReviewsColumns.TIME]);
String month = null;
if(prodID!=null){
Date time=
new Date(date*1000);
SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM");
month = sdf.format(time);
if(month!=null){
REVIEW.set(new Text(prodID), new DoubleWritable(score));
ctx.write(new Text(month), REVIEW);
}
}
}
catch(Exception e){
e.printStackTrace();
}
}
}
}
/**
* Computes top K products for each month
* @author fabrizio
*
*/
public static class Reducer1 extends
Reducer<Text,ReviewWritable,Text,ReviewWritable> {
private Map<String, ProductMean> TimeProduct2Mean =
new HashMap<String, ProductMean>();
private static ReviewWritable REVIEW = new ReviewWritable();
protected class ProductMean implements
Comparable<ProductMean>{
String productID;
Double sum;
Integer counter;//number of ratings
Double mean;
ProductMean(String productID, Double sum){
this.productID = productID;
this.sum = sum;
this.counter = 1;
}
void increase(Double value){
this.sum = this.sum + value;
this.counter = this.counter + 1;
}
void computeMean(){
this.mean = this.sum/this.counter;
}
public int compareTo(ProductMean obj) {
if (this.mean < obj.mean) return 1;
if (this.mean > obj.mean) return -1;
return this.productID.compareTo(obj.productID);
}
public String toString(){
return productID+" mean:"+mean;
}
}
/*
* Prepare the map that will be parsed in cleanup.
* The map is TIME_ProdID -> ProductMean
* ProductMean contains the product ID and the mean
* @see org.apache.hadoop.mapreduce.Reducer#reduce
*/
@Override
public void reduce(Text key,
Iterable<ReviewWritable> values, Context ctx)
throws IOException, InterruptedException {
for (ReviewWritable value : values) {
//combine month to product ID
String productID = value.getProductID().toString();
String timeProduct = key.toString()+"_"+productID;
if(TimeProduct2Mean.containsKey(timeProduct)){
TimeProduct2Mean.get(timeProduct).increase((double)value.
getScore().get());
}
else{
TimeProduct2Mean.put(timeProduct,
new ProductMean(productID, (double)value.getScore().get()));
}
}
}
@Override
protected void cleanup(Context ctx)
throws IOException, InterruptedException {
//map time 2 sorted products
Map<String, Set<ProductMean>> result =
new TreeMap<String, Set<ProductMean>>();
for(String time2productID: this.TimeProduct2Mean.keySet()){
String[] parts = time2productID.split("_");
String time = parts[0];
//it's time to compute the mean
ProductMean element = this.TimeProduct2Mean.get(time2productID);
element.computeMean();
//inserting elements in descending order defined by compareTo of
ProductMean
if(result.get(time)==null){
Set<ProductMean> sortedResults = new TreeSet<ProductMean>();
sortedResults.add(element);
result.put(time, sortedResults);
}
else
result.get(time).add(element);
}
//getting the TOP_K products per month
for(String time: result.keySet()){
int counter = 1;
for(ProductMean mean: result.get(time)){
if(counter>TOP_K)
break;
REVIEW.set(new Text(mean.productID), new DoubleWritable(mean.mean));
ctx.write(new Text(time), REVIEW);
counter++;
}
}
}
}
/**
* Do the job
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
long start=System.currentTimeMillis();
Configuration conf = new Configuration();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage:TopHighestScore <directory-in>
<directory-out>");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJobName("TopHighestScore-pass-1");
job.setJarByClass(TopHighestScore.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ReviewWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ReviewWritable.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
int flag = job.waitForCompletion(true) ? 0 : 1;
long end=System.currentTimeMillis();
System.out.println("#Execution time in seconds : "+ (end- start)/1000.0);
System.exit(flag);
}
Вывод:
Map-Reduce Framework
Map input records=5116093
Map output records=0
Map output bytes=0
Map output materialized bytes=18
Input split bytes=291
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=18
Reduce input records=0
Reduce output records=0
Spilled Records=0
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=14952
CPU time spent (ms)=38174
Physical memory (bytes) snapshot=943157248
Virtual memory (bytes) snapshot=1103048704
Total committed heap usage (bytes)=655884288
Нет ошибок, кроме записей MapOutput= 0, мой выходной файл part-r-00000 пуст. Помоги мне с кодом. Спасибо!