Я запускаю задание hadoop и пытаюсь записать вывод в Cassandra. Я получаю следующее исключение:
java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to java.nio.ByteBuffer
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter.write(ColumnFamilyRecordWriter.java:60)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:514)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Я смоделировал код сокращения карты на примере WordCount, указанном в https://wso2.org/repos/wso2/trunk/carbon/dependencies/cassandra/contrib/word_count/src/WordCount.java
Вот мой код MR:
public class SentimentAnalysis extends Configured implements Tool {
static final String KEYSPACE = "Travel";
static final String OUTPUT_COLUMN_FAMILY = "Keyword_PtitleId";
public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
Sentiment sentiment = null;
try {
sentiment = (Sentiment) PojoMapper.fromJson(line, Sentiment.class);
} catch(Exception e) {
return;
}
if(sentiment != null && sentiment.isLike()) {
word.set(sentiment.getNormKeyword());
context.write(word, new LongWritable(sentiment.getPtitleId()));
}
}
}
public static class Reduce extends Reducer<Text, LongWritable, ByteBuffer, List<Mutation>> {
private ByteBuffer outputKey;
public void reduce(Text key, Iterator<LongWritable> values, Context context) throws IOException, InterruptedException {
List<Long> ptitles = new ArrayList<Long>();
java.util.Map<Long, Integer> ptitleToFrequency = new HashMap<Long, Integer>();
while (values.hasNext()) {
Long value = values.next().get();
ptitles.add(value);
}
for(Long ptitle : ptitles) {
if(ptitleToFrequency.containsKey(ptitle)) {
ptitleToFrequency.put(ptitle, ptitleToFrequency.get(ptitle) + 1);
}
else {
ptitleToFrequency.put(ptitle, 1);
}
}
byte[] keyBytes = key.getBytes();
outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length));
for(Long ptitle : ptitleToFrequency.keySet()) {
context.write(outputKey, Collections.singletonList(getMutation(new Text(ptitle.toString()), ptitleToFrequency.get(ptitle))));
}
}
private static Mutation getMutation(Text word, int sum)
{
Column c = new Column();
byte[] wordBytes = word.getBytes();
c.name = ByteBuffer.wrap(Arrays.copyOf(wordBytes, wordBytes.length));
c.value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
c.timestamp = System.currentTimeMillis() * 1000;
Mutation m = new Mutation();
m.column_or_supercolumn = new ColumnOrSuperColumn();
m.column_or_supercolumn.column = c;
return m;
}
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new SentimentAnalysis(), args);
System.exit(ret);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "SentimentAnalysis");
job.setJarByClass(SentimentAnalysis.class);
String inputFile = args[0];
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
FileInputFormat.setInputPaths(job, inputFile);
ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
Если вы загляните под класс Reduce, я правильно преобразовываю текстовое поле (ключ) в ByteBuffer.
Буду признателен за некоторые указания, как это исправить.