Mapreduce как связать Mapper >> Редуктор >> Редуктор - PullRequest
0 голосов
/ 22 марта 2019

У меня проблема с цепочкой

Mapper >> Редуктор >> Редуктор

Это мои данные:

Dpt.csv

EmpNo1, DeptNo1
EmpNo2, DeptNo2
EmpNo3, DeptNo1
EmpNo4, DeptNo2
EmpNo5, DeptNo2
EmpNo6, DeptNo1

Emp.csv

EmpNo1,10000
EmpNo2,4675432
EmpNo3,76568658
EmpNo4,241423
EmpNo5,75756
EmpNo6,9796854

И, наконец, я хочу что-то вроде этого:

Dept1 >> Total_Salary_Dept_1

Одна из основных проблем заключается в том, что мой первый редуктор не вызывается, когда я использую несколько файлов в качестве входных данных.

Вторая проблема заключается в том, что я не могу передать этот вывод следующему редуктору. (ChainReducer не может соединить 2 редуктора)

Я использовал этот в качестве ссылки, но быстро понял, что это не поможет.

Я нашел эту ссылку, где в одном из комментариев автор говорит следующее: "В серии Hadoop 2.X внутренне вы можете связывать преобразователи перед редуктором с помощью ChainMapper и цепочки Картографы после редуктора с ChainReducer. "

Означает ли это, что у меня будет такая структура:

Chain Mapper (mapper 1) -> Цепной редуктор (редуктор 1) -> ChainMapper (ненужный картограф) -> Цепной редуктор (редуктор 2)

И если это так, то как именно данные передаются из Редуктора 1 в Картер 2?

Может кто-нибудь мне помочь?

Пока это мой код.

Спасибо.

package Aggregate;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Sales extends Configured implements Tool{

    public static class CollectionMapper extends Mapper<LongWritable, Text, Text, Text>{


        public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {

            String[] vals = value.toString().split(",");                        
            context.write(new Text(vals[0]), new Text(vals[1]));

         }
    }

    public static class DeptSalaryJoiner extends Reducer<Text, Text, Text, Text>{

        public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException {

            ArrayList<String> DeptSal = new ArrayList<>();

            for (Text val : values) {

                DeptSal.add(val.toString());

            }      
            context.write(new Text(DeptSal.get(0)), new Text(DeptSal.get(1)));
        }
    }

    public static class SalaryAggregator extends Reducer<Text, Text, Text, IntWritable>{

        public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException {

            Integer totalSal = 0;
            for (Text val : values) {
                Integer salary = new Integer(val.toString());
                totalSal += salary; 

            }      
            context.write(key, new IntWritable(totalSal));
        }
    }

    public static void main(String[] args) throws Exception {
        int exitFlag = ToolRunner.run(new Sales(), args);
        System.exit(exitFlag);
    }

    @Override
    public int run(String[] args) throws Exception {

        String input1 = "./emp.csv";
        String input2 = "./dept.csv";
        String output = "./DeptAggregate";

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Sales");
        job.setJarByClass(getClass());

        Configuration mapConf = new Configuration(false);
        ChainMapper.addMapper(job, CollectionMapper.class, LongWritable.class, Text.class, Text.class, Text.class,  mapConf);

        Configuration reduce1Conf = new Configuration(false);
        ChainReducer.setReducer(job, DeptSalaryJoiner.class, Text.class, Text.class, Text.class, Text.class, reduce1Conf);

        Configuration reduce2Conf = new Configuration(false);
        ChainReducer.setReducer(job, SalaryAggregator.class, Text.class, Text.class, Text.class, IntWritable.class, reduce2Conf);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(input1));
        FileInputFormat.addInputPath(job, new Path(input2));

        try {
            File f = new File(output);
            FileUtils.forceDelete(f);
        } catch (Exception e) {

        }

        FileOutputFormat.setOutputPath(job, new Path(output));

        return job.waitForCompletion(true) ? 0 : 1;
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...