Пустой выходной файл после преобразования карты - PullRequest
0 голосов
/ 08 октября 2019

Я работаю в mapreduce в Hadoop. Все работает нормально, но в выходном файле ничего не написано. Я использовал приведенный ниже код для запуска задания.

Я не могу найти, в чем проблема. Почему он не пишет в вывод? Это для анализа отзывов Amazon. Работа, которая подсчитывает за каждый месяц топ-5 продуктов с самым высоким средним баллом.

Вывод должен включать ProduceId и среднюю оценку, отсортированную по времени.

public class TopHighestScore {



 private static final int TOP_K = 5;



/**

 * Returns a month with the ReviewWritable object 
     (= productID and   the rating)

 * @author fabrizio

 *

 */

 public static class Mapper1 extends 
     Mapper<LongWritable,Text,Text,ReviewWritable> {



    private static ReviewWritable REVIEW = new ReviewWritable();

    @Override

    public void map(LongWritable key, Text value, 

            Context ctx) 
            throws IOException, InterruptedException {

        String[] cols = (value.toString()).split("\t");

        //check data correctness

        if(cols!=null && cols.length==10){

            try {

                String prodID = 
                    cols[AmazonFoodReviewsColumns.PROD_ID];

                int score = 
                Integer.parseInt(cols[AmazonFoodReviewsColumns.SCORE]);

                long date = 
                 Long.parseLong(cols[AmazonFoodReviewsColumns.TIME]);

                String month = null;

                if(prodID!=null){

                    Date time=
                                     new Date(date*1000);

                    SimpleDateFormat sdf = 
                                      new SimpleDateFormat("yyyy-MM");

                    month = sdf.format(time);

                    if(month!=null){


                REVIEW.set(new Text(prodID), new DoubleWritable(score));

            ctx.write(new Text(month), REVIEW);

                    }

                }               

            }

            catch(Exception e){

                e.printStackTrace();

            }

        }   

    }

}



/**

 * Computes top K products for each month

 * @author fabrizio

 *

 */

public static class Reducer1 extends  
    Reducer<Text,ReviewWritable,Text,ReviewWritable> {



    private Map<String, ProductMean> TimeProduct2Mean =
                  new HashMap<String, ProductMean>();

    private static ReviewWritable REVIEW = new ReviewWritable();



    protected class ProductMean implements 
               Comparable<ProductMean>{

        String productID;

        Double sum;
        Integer counter;//number of ratings

        Double mean;
        ProductMean(String productID, Double sum){

            this.productID = productID;

            this.sum = sum;

            this.counter = 1;

        }

        void increase(Double value){

            this.sum = this.sum + value;

            this.counter = this.counter + 1;

        }

        void computeMean(){

            this.mean = this.sum/this.counter;

        }

        public int compareTo(ProductMean obj) {

            if (this.mean < obj.mean) return 1;

            if (this.mean > obj.mean) return -1;

        return this.productID.compareTo(obj.productID);

        }

        public String toString(){

            return productID+" mean:"+mean;

        }

    }



    /*

     * Prepare the map that will be parsed in cleanup.

     * The map is TIME_ProdID -> ProductMean

     * ProductMean contains the product ID and the mean 

     * @see org.apache.hadoop.mapreduce.Reducer#reduce

     */

    @Override

    public void reduce(Text key, 
              Iterable<ReviewWritable> values, Context ctx) 
             throws IOException, InterruptedException {



        for (ReviewWritable value : values) {           

            //combine month to product ID

        String productID = value.getProductID().toString();

        String timeProduct = key.toString()+"_"+productID;

        if(TimeProduct2Mean.containsKey(timeProduct)){

  TimeProduct2Mean.get(timeProduct).increase((double)value.
       getScore().get());

            }

            else{

TimeProduct2Mean.put(timeProduct, 
    new ProductMean(productID, (double)value.getScore().get()));

            }

        }           

    }



    @Override

    protected void cleanup(Context ctx) 
            throws IOException, InterruptedException {



        //map time 2 sorted products

Map<String, Set<ProductMean>> result =
   new TreeMap<String, Set<ProductMean>>();

for(String time2productID: this.TimeProduct2Mean.keySet()){

            String[] parts = time2productID.split("_");

            String time = parts[0]; 



            //it's time to compute the mean

ProductMean element = this.TimeProduct2Mean.get(time2productID);

            element.computeMean();


//inserting elements in descending order defined by compareTo of  
    ProductMean

            if(result.get(time)==null){

    Set<ProductMean> sortedResults = new TreeSet<ProductMean>();

                sortedResults.add(element);

                result.put(time, sortedResults);

            }

            else

                result.get(time).add(element);  

        }



        //getting the TOP_K products per month

        for(String time: result.keySet()){

            int counter = 1;

            for(ProductMean mean: result.get(time)){

                if(counter>TOP_K)

                    break;

REVIEW.set(new Text(mean.productID), new DoubleWritable(mean.mean));

                ctx.write(new Text(time), REVIEW);

                counter++;

            }

        }

    }

}



/**

 * Do the job

 * @param args

 * @throws Exception

 */

public static void main(String[] args) throws Exception {



    long start=System.currentTimeMillis();



    Configuration conf = new Configuration();

   String[] otherArgs = 
   new GenericOptionsParser(conf,    args).getRemainingArgs();

    if (otherArgs.length != 2) {

    System.err.println("Usage:TopHighestScore <directory-in> 
            <directory-out>");

        System.exit(2);

    }



    Job job = Job.getInstance(conf);

    job.setJobName("TopHighestScore-pass-1");

    job.setJarByClass(TopHighestScore.class);

    job.setMapperClass(Mapper1.class);

    job.setReducerClass(Reducer1.class);

    job.setInputFormatClass(TextInputFormat.class);

    job.setMapOutputKeyClass(Text.class);

    job.setMapOutputValueClass(ReviewWritable.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(ReviewWritable.class);





    FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));



    int flag = job.waitForCompletion(true) ? 0 : 1;

    long end=System.currentTimeMillis();

    System.out.println("#Execution time in seconds : "+ (end-                start)/1000.0);



    System.exit(flag);

}

Вывод:

Map-Reduce Framework
                Map input records=5116093
                Map output records=0
                Map output bytes=0
                Map output materialized bytes=18
                Input split bytes=291
                Combine input records=0
                Combine output records=0
                Reduce input groups=0
                Reduce shuffle bytes=18
                Reduce input records=0
                Reduce output records=0
                Spilled Records=0
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=14952
                CPU time spent (ms)=38174
                Physical memory (bytes) snapshot=943157248
                Virtual memory (bytes) snapshot=1103048704
                Total committed heap usage (bytes)=655884288

Нет ошибок, кроме записей MapOutput= 0, мой выходной файл part-r-00000 пуст. Помоги мне с кодом. Спасибо!

...