Я практикую MapReduce, и у меня есть файл Amazon .tsv, в котором есть список обзоров с рейтингом продуктов. 1 продукт имеет много отзывов и оценку в каждом обзоре. Обзоры также имеют другие данные, такие как user_id, product_name, review_title, ect. Я хочу использовать MapReduce для этого файла, чтобы сгенерировать выходные данные из 3 столбцов: ID продукта, общее количество отзывов и средний рейтинг продукта.
ссылка на файл, который я использую для тестирования: ССЫЛКА (Это sample_us.tsv )
https://gofile.io/?c=wLsv0y
Пока у меня написано следующее, но я получаю несколько ошибки. Пожалуйста, дайте мне знать, если есть какие-либо исправления, которые вы видите, или лучше logi c, которые могут быть реализованы для достижения той же цели. Я использовал oop между прочим.
Mapper:
package stubs;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ReviewMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
int productIndex = 3; //index for productID
int ratingIndex = 7; //index for ratingID
String input = value.toString();
String [] line = input.split("\\t");
String productID = line[productIndex];
String ratingVal = line[ratingIndex];
if((productID.length() > 0) && (ratingVal.length() == 1))
{
int starRating = Integer.valueOf(ratingVal);
context.write(new Text(productID), new IntWritable(starRating));
}
}
}
А затем мой редуктор:
package stubs;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ReviewReducer extends Reducer<Text, IntWritable, Text, Text> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int reviewCount = 0;
int combineRating = 0;
for(IntWritable value : values)
{
reviewCount++;
combineRating += value.get();
}
int avgRating = (combineRating/reviewCount);
String reviews = Integer.toString(reviewCount);
String ratings = Integer.toString(avgRating);
String result = reviews+ "\t" +ratings;
context.write(key, new Text(result));
}
}
Наконец драйвер:
package stubs;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class AvgRatingReviews {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: AvgWordLength <input dir> <output dir>\n");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(AvgRatingReviews.class);
job.setJobName("Review Results");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ReviewMapper.class);
job.setReducerClass(ReviewReducer.class);
job.setOutputKeyClass(Text.class);;
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}