Ошибка ClassCast при записи в Cassandra из задания hadoop - PullRequest
1 голос
/ 01 февраля 2012

Я запускаю задание hadoop и пытаюсь записать вывод в Cassandra. Я получаю следующее исключение:

java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to java.nio.ByteBuffer
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter.write(ColumnFamilyRecordWriter.java:60)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:514)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)

Я смоделировал код сокращения карты на примере WordCount, указанном в https://wso2.org/repos/wso2/trunk/carbon/dependencies/cassandra/contrib/word_count/src/WordCount.java

Вот мой код MR:

public class SentimentAnalysis extends Configured implements Tool {

static final String KEYSPACE = "Travel";
static final String OUTPUT_COLUMN_FAMILY = "Keyword_PtitleId";

public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        Sentiment sentiment = null;
        try {
            sentiment = (Sentiment) PojoMapper.fromJson(line, Sentiment.class);
        } catch(Exception e) {
            return;
        }

        if(sentiment != null && sentiment.isLike()) {
            word.set(sentiment.getNormKeyword());
            context.write(word, new LongWritable(sentiment.getPtitleId()));
        }
    }
}

public static class Reduce extends Reducer<Text, LongWritable, ByteBuffer, List<Mutation>> {

    private ByteBuffer outputKey;

    public void reduce(Text key, Iterator<LongWritable> values, Context context) throws IOException, InterruptedException {

        List<Long> ptitles = new ArrayList<Long>();
        java.util.Map<Long, Integer> ptitleToFrequency = new HashMap<Long, Integer>();

        while (values.hasNext()) {
            Long value = values.next().get();
            ptitles.add(value);
        }

        for(Long ptitle : ptitles) {
            if(ptitleToFrequency.containsKey(ptitle)) {
                ptitleToFrequency.put(ptitle, ptitleToFrequency.get(ptitle) + 1);
            }

            else {
                ptitleToFrequency.put(ptitle, 1);
            }
        }

        byte[] keyBytes = key.getBytes();
        outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length));
        for(Long ptitle : ptitleToFrequency.keySet()) {
            context.write(outputKey, Collections.singletonList(getMutation(new Text(ptitle.toString()), ptitleToFrequency.get(ptitle))));
        }
    }

    private static Mutation getMutation(Text word, int sum)
    {
        Column c = new Column();
        byte[] wordBytes = word.getBytes();
        c.name = ByteBuffer.wrap(Arrays.copyOf(wordBytes, wordBytes.length));
        c.value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
        c.timestamp = System.currentTimeMillis() * 1000;

        Mutation m = new Mutation();
        m.column_or_supercolumn = new ColumnOrSuperColumn();
        m.column_or_supercolumn.column = c;
        return m;
    }
}

  public static void main(String[] args) throws Exception {
      int ret = ToolRunner.run(new SentimentAnalysis(), args);
      System.exit(ret);
  }

@Override
public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "SentimentAnalysis");
    job.setJarByClass(SentimentAnalysis.class);

    String inputFile = args[0];

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    job.setOutputKeyClass(ByteBuffer.class);
    job.setOutputValueClass(List.class);
    job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
    job.setInputFormatClass(TextInputFormat.class);

    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);

    FileInputFormat.setInputPaths(job, inputFile);

    ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
}

}

Если вы загляните под класс Reduce, я правильно преобразовываю текстовое поле (ключ) в ByteBuffer.

Буду признателен за некоторые указания, как это исправить.

1 Ответ

2 голосов
/ 03 февраля 2012

После некоторых проб и ошибок я смог выяснить, как решить эту конкретную проблему. По сути, в моей сигнатуре метода Reduce я использовал Iterator вместо Iterable, поэтому редуктор никогда не вызывался. И hadoop пытался записать мой вывод Mapper (Text, LongWritable) в Cassandra, используя outputKey / Value Classes для Reducer (ByteBuffer, List). Это вызывало ClassCastException.

Изменение сигнатуры метода приведения на Iterable решило эту проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...