MapReduce: Рассчитать средний рейтинг и общее количество отзывов о товаре - PullRequest
0 голосов
/ 24 января 2020

Я практикую MapReduce, и у меня есть файл Amazon .tsv, в котором есть список обзоров с рейтингом продуктов. 1 продукт имеет много отзывов и оценку в каждом обзоре. Обзоры также имеют другие данные, такие как user_id, product_name, review_title, ect. Я хочу использовать MapReduce для этого файла, чтобы сгенерировать выходные данные из 3 столбцов: ID продукта, общее количество отзывов и средний рейтинг продукта.

ссылка на файл, который я использую для тестирования: ССЫЛКА (Это sample_us.tsv )

https://gofile.io/?c=wLsv0y

Пока у меня написано следующее, но я получаю несколько ошибки. Пожалуйста, дайте мне знать, если есть какие-либо исправления, которые вы видите, или лучше logi c, которые могут быть реализованы для достижения той же цели. Я использовал oop между прочим.

Mapper:

package stubs;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReviewMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{

  @Override
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
  {

      int productIndex = 3; //index for productID
      int ratingIndex = 7; //index for ratingID

      String input = value.toString();
      String [] line = input.split("\\t");

      String productID = line[productIndex];
      String ratingVal = line[ratingIndex];


      if((productID.length() > 0) && (ratingVal.length() == 1))
      {
         int starRating = Integer.valueOf(ratingVal);
         context.write(new Text(productID), new IntWritable(starRating));
      }
  }
}

А затем мой редуктор:

package stubs;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReviewReducer extends Reducer<Text, IntWritable, Text, Text> {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException
  {
      int reviewCount = 0;
      int combineRating = 0;
      for(IntWritable value : values)
      {
          reviewCount++;
          combineRating += value.get();
      }

      int avgRating = (combineRating/reviewCount);
      String reviews = Integer.toString(reviewCount);
      String ratings = Integer.toString(avgRating);
      String result = reviews+ "\t" +ratings;

      context.write(key,  new Text(result));
  }
}

Наконец драйвер:

package stubs;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class AvgRatingReviews {

  public static void main(String[] args) throws Exception {

    if (args.length != 2) {
    System.out.printf("Usage: AvgWordLength <input dir> <output dir>\n");
    System.exit(-1);
    }

    Job job = new Job();
    job.setJarByClass(AvgRatingReviews.class);  
    job.setJobName("Review Results");

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job,  new Path(args[1]));

    job.setMapperClass(ReviewMapper.class);
    job.setReducerClass(ReviewReducer.class);

    job.setOutputKeyClass(Text.class);;
    job.setOutputValueClass(Text.class);

    boolean success = job.waitForCompletion(true);
    System.exit(success ? 0 : 1);
  }
}
...