У меня есть следующая схема выводов
public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>
- Ввод: текст / текст
- Вывод: текст / перечисление (RecordWritable, my ownкласс)
public static class JoinSumReducer extends Reducer<Text, RecordWritable, Text, DoubleWritable>
- Ввод: Text / Enum (RecordWritable, мой собственный класс)
- Вывод: Text / Double
Я получаю следующее исключение времени выполнения java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
(полная трассировка стека после кода).
Я пробовал решение, предложенное Несоответствие типов в значении с карты: ожидаемый org.apache.hadoop.io.NullWritable, получено org.apache.hadoop.io.Text , но это приводит к исключению времени выполнения: java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
(полная трассировка стека после кода).
Очевидно, что где-то есть несоответствие типов, но я следовал всем определениям значений и не могу найти то, что мне не хватает.Есть ли другое место, где мне нужно определить, какие типы используются?
Вот мой код
Переписываемый класс Enum
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* Writable for enum Record
*/
public class RecordWritable implements Writable{
public static enum Record {BUY, CLICK};
private Record data;
public void set(Record data) {
this.data = data;
}
public Record get() {
return this.data;
}
public void readFields(DataInput dataInput) throws IOException {
data = WritableUtils.readEnum(dataInput, Record.class);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeEnum(dataOutput,data);
}
}
Mapper / Reducer и Main
import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SuccessRate {
/**
* Mapper
* - Key = ItemID
* - Value = The type of record is determined by number of columns
*/
public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>{
private Text itemID = new Text();
private RecordWritable record = new RecordWritable();
Pattern itemIDpattern = Pattern.compile("^(\\d+),");
Pattern columnPattern = Pattern.compile(",");
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
Scanner itr = new Scanner(value.toString());
while (itr.hasNextLine()) {
String line = itr.nextLine();
String id = null;
Matcher m = itemIDpattern.matcher(line);
if(m.find())
id = m.group(1);
RecordWritable.Record fileType;
int count = StringUtils.countMatches(line, ",");
if(count==4)
fileType = RecordWritable.Record.CLICK;
else
fileType = RecordWritable.Record.BUY;
if(id != null) {
itemID.set(id);
record.set(fileType);
context.write(itemID, record);
}
}
itr.close();
}
}
/**
* Reducer
* - Key : ItemID
* - Value : sum of buys / sum of clicks
*/
public static class JoinSumReducer
extends Reducer<Text, RecordWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<RecordWritable> values,
Context context
) throws IOException, InterruptedException {
int sumClick = 0;
int sumBuy = 0;
for (RecordWritable val : values) {
switch(val.get()) {
case CLICK:
sumClick += 1;
break;
case BUY:
sumBuy += 1;
break;
}
}
result.set((double)sumBuy/(double)sumClick);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "success rate");
job.setJarByClass(SuccessRate.class);
job.setMapperClass(RecordMapper.class);
job.setCombinerClass(JoinSumReducer.class);
job.setReducerClass(JoinSumReducer.class);
// job.setMapOutputKeyClass(Text.class); // I tried adding these two lines after reading https://stackoverflow.com/q/16926783/3303546
// job.setMapOutputValueClass(RecordWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Полная трассировка стека исключений
Исходная ошибка
java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1093)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:727)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at SuccessRate$RecordMapper.map(SuccessRate.java:54)
at SuccessRate$RecordMapper.map(SuccessRate.java:26)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Я попробовал решение, предложенное Несоответствие типов в значении из карты: ожидаемый org.apache.hadoop.io.NullWritable, получено org.apache.hadoop.io.Text , но это приводит к исключению времени выполнения:
2018-09-24 11:36:04,423 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@5532c2f8
java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1562)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1879)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:86)
at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:66)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1900)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1662)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1505)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:735)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2076)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:809)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)