У меня проблема с цепочкой
Mapper >> Редуктор >> Редуктор
Это мои данные:
Dpt.csv
EmpNo1, DeptNo1
EmpNo2, DeptNo2
EmpNo3, DeptNo1
EmpNo4, DeptNo2
EmpNo5, DeptNo2
EmpNo6, DeptNo1
Emp.csv
EmpNo1,10000
EmpNo2,4675432
EmpNo3,76568658
EmpNo4,241423
EmpNo5,75756
EmpNo6,9796854
И, наконец, я хочу что-то вроде этого:
Dept1 >> Total_Salary_Dept_1
Одна из основных проблем заключается в том, что мой первый редуктор не вызывается, когда я использую несколько файлов в качестве входных данных.
Вторая проблема заключается в том, что я не могу передать этот вывод следующему редуктору. (ChainReducer не может соединить 2 редуктора)
Я использовал этот в качестве ссылки, но быстро понял, что это не поможет.
Я нашел эту ссылку, где в одном из комментариев автор говорит следующее: "В серии Hadoop 2.X внутренне вы можете связывать преобразователи перед редуктором с помощью ChainMapper и цепочки Картографы после редуктора с ChainReducer. "
Означает ли это, что у меня будет такая структура:
Chain Mapper (mapper 1) -> Цепной редуктор (редуктор 1) -> ChainMapper (ненужный картограф) -> Цепной редуктор (редуктор 2)
И если это так, то как именно данные передаются из Редуктора 1 в Картер 2?
Может кто-нибудь мне помочь?
Пока это мой код.
Спасибо.
package Aggregate;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Sales extends Configured implements Tool{
public static class CollectionMapper extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] vals = value.toString().split(",");
context.write(new Text(vals[0]), new Text(vals[1]));
}
}
public static class DeptSalaryJoiner extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
ArrayList<String> DeptSal = new ArrayList<>();
for (Text val : values) {
DeptSal.add(val.toString());
}
context.write(new Text(DeptSal.get(0)), new Text(DeptSal.get(1)));
}
}
public static class SalaryAggregator extends Reducer<Text, Text, Text, IntWritable>{
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Integer totalSal = 0;
for (Text val : values) {
Integer salary = new Integer(val.toString());
totalSal += salary;
}
context.write(key, new IntWritable(totalSal));
}
}
public static void main(String[] args) throws Exception {
int exitFlag = ToolRunner.run(new Sales(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
String input1 = "./emp.csv";
String input2 = "./dept.csv";
String output = "./DeptAggregate";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Sales");
job.setJarByClass(getClass());
Configuration mapConf = new Configuration(false);
ChainMapper.addMapper(job, CollectionMapper.class, LongWritable.class, Text.class, Text.class, Text.class, mapConf);
Configuration reduce1Conf = new Configuration(false);
ChainReducer.setReducer(job, DeptSalaryJoiner.class, Text.class, Text.class, Text.class, Text.class, reduce1Conf);
Configuration reduce2Conf = new Configuration(false);
ChainReducer.setReducer(job, SalaryAggregator.class, Text.class, Text.class, Text.class, IntWritable.class, reduce2Conf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(input1));
FileInputFormat.addInputPath(job, new Path(input2));
try {
File f = new File(output);
FileUtils.forceDelete(f);
} catch (Exception e) {
}
FileOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : 1;
}
}