Combiner не вызывается в Hadoop - PullRequest
       4

Combiner не вызывается в Hadoop

0 голосов
/ 18 октября 2019

Привет, ребята. Я новичок в hadoop. Я пытаюсь запустить этот код, который я получил онлайн, чтобы посмотреть, как будет выглядеть вывод, но я думаю, что объединитель не вызывается по какой-то причине. Пожалуйста помоги. Код использует объединитель картографирования и редуктор. преобразователь действительно что-то выводит, но объединитель не получает никакого ввода, и я подозреваю, что это может быть потому, что объединитель не вызывается.

import java.lang.String;
import java.io.IOException;
import java.util.StringTokenizer;
import java.text.SimpleDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.util.Date;
import java.text.ParseException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;



public class Nyc {

  public static class AvgPassengerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

        // Enums for month
    public enum Months
    {
        JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP, OCT, NOV, DEC
    }
    // Enums for days
    public enum Days
    {
        MON,TUE,WED,THU,FRI,SAT,SUN
    }
    public void map(Object key, Text value, Context context) throws IOException,
    InterruptedException {

        word.set(value);
        String nextLine =word.toString();

        //each line is splitted using comma
        String [] columns=nextLine.split(",");

        //total number of columns is 18 and first columns is checked to pass first row
                String id = columns[0]; // Get passenger ID.

        if(columns.length==17 && !columns[0].equals("VendorID"))  //id.matches("^-?\\d+$"))
        {
            //Pick up date time format
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                //first column is converted to date
                Date date=df.parse(columns[1]);
                //unique key month wise passanger
                Text monthkey=new Text(Months.values()[date.getMonth()]+"_Total_Passenger");
                context.write(monthkey, new IntWritable(Integer.parseInt(columns[3]))); //column 3 value is 

                //Total trip in a month
                context.write(new Text(Months.values()[date.getMonth()]+"_Total_Trip"), new IntWritable(1));

                //unique key for each day passanger
                Text daykey=new Text(Days.values()[date.getDay()]+"_Day_Total_Passenger");
                context.write(daykey, new IntWritable(Integer.parseInt(columns[3])));

                //Total trip on particular day is intialized with one
                context.write(new Text(Days.values()[date.getDay()]+"_Day_Total_Trip"), new IntWritable(1));

                context.write(new Text("Total_Trip_Number"), new IntWritable(1));
                context.write(new Text("Total_Passenger_Avg"), new IntWritable(Integer.parseInt(columns[3])));
            } catch (ParseException e) {
                e.printStackTrace();
            }

        }


    }
  }


public class AvgPassengerCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
    IOException, InterruptedException {
    // combiner for each class is almost same only output value may be change as per requirement
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
    result.set(sum);
    context.write(key, result);
    }
}

public class AvgPassengerReducer extends Reducer<Text, IntWritable, Text, FloatWritable> {
    private IntWritable result = new IntWritable(); 
    float totalPassenger = 0;
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
    IOException, InterruptedException { 
            int sumVal = 0;
            if(key.find("Passenger")>0) //if passanger is present in key
            {
                for (IntWritable val : values)
                {
                    sumVal+=val.get();  
                }
                totalPassenger=sumVal; //total passanger

            //every key is written to disk for varification
            context.write(key, new FloatWritable(sumVal));
            }
            else
            {//passanger count
                for (IntWritable val : values)
                {
                    sumVal+=val.get();  
                }

            context.write(key, new FloatWritable(sumVal)); // passanger count with key write on disk
            String [] pasKey= key.toString().split("_");

            //Average passanger is calculated and write to disk
            context.write(new Text(pasKey[0]+"_Passenger_Avg"), new FloatWritable(totalPassenger/sumVal));
            }


     }
}







 public static void main(String[] args) throws Exception {

     Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Nyc");
    job.setJarByClass(Nyc.class);
    job.setMapperClass(AvgPassengerMapper.class);
    job.setCombinerClass(AvgPassengerCombiner.class);
    job.setReducerClass(AvgPassengerReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }
}

Вот выходной синпет

сильный текст

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...