Java Hadoop MapReduce Chaining Job - PullRequest
       29

Java Hadoop MapReduce Chaining Job

1 голос
/ 16 октября 2019

У меня есть код, который правильно выбирает источник и максимальный вес. Я не могу тянуть целевой столб также. кто-то может указать мне правильное направление? Я никогда не использовал Java раньше. Я думаю, что функция редуктора должна возвращать кортеж. следовательно, должен ли переменный target в функции mapper иметь этот кортеж?

Желаемый вывод: каждая строка содержит идентификатор узла, за которым следует табуляция (\ t) и ожидаемый кортеж «tgt, weight». Кортеж - это тгт с наибольшим весом. В случае ничьей верните tgt с наименьшим номером.

INPUT

src        tgt        weight

1        110        3

1        200        1

20        150        30

10        110        10

11        130        15

11        200        67

1        70        3

EXPECTED OUTPUT

1        70,3

20        150,30

10        110,10

11        200,67

CURRENT OUTPUT (необходимо добавить встолбец tgt как кортеж)

1        3

20        30

10        10

11        67
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Q1 {


  public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {

      private Text target = new Text();
      public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString(), "\r");
            while (st.hasMoreTokens()) {
                String[] edge = st.nextToken().split("\t");
                target.set(edge[0]);
                context.write(target, new IntWritable(Integer.parseInt(edge[2])));
            }
        }

    }

  public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable totalCount = new IntWritable();  
      public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{

            int max = 0;

            for (IntWritable target : targets)  {
                if(target.get() > max || max ==0) {
                    max = target.get();
                }
            }

            totalCount.set(max);

            context.write(key, totalCount);


        }
    }




    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Q1");

        job.setJarByClass(Q1.class);
        job.setMapperClass(TargetMapper.class);
        job.setCombinerClass(EmailsReducer.class);
        job.setReducerClass(EmailsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
...