Несоответствие типа в ключе с карты: ожидается .. Текст, получено ... LongWritable - PullRequest
6 голосов
/ 17 декабря 2011

У меня есть простое приложение hadoop, которое получает один CSV-файл, затем разделяет запись на «,», а затем подсчитывает первые элементы.

Ниже приведен мой код.

package com.bluedolphin;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool {
    private final static LongWritable one = new LongWritable(1);


    public static class MapClass extends Mapper<Object, Text, Text, LongWritable> {
        private Text word = new Text();
        public void map(Object key, 
                    Text value, 
                    OutputCollector<Text, LongWritable> output,
                    Reporter reporter) throws IOException, InterruptedException {
            String[] citation = value.toString().split(",");
            word.set(citation[0]);
            output.collect(word, one);
        }
    }

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
        public void reduce(
                Text key, 
                Iterator<LongWritable> values, 
                OutputCollector<Text, LongWritable> output,
                Reporter reporter) throws IOException, InterruptedException {
            int sum = 0;

            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new LongWritable(sum));
        }
    }
    public static class Combiner extends Reducer<Text, IntWritable, Text, LongWritable> {
        public void reduce(
                Text key, 
                Iterator<LongWritable> values, 
                OutputCollector<Text, LongWritable> output,
                Reporter reporter) throws IOException, InterruptedException {
            int sum = 0;

            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new LongWritable(sum));

        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "MyJob");
        job.setJarByClass(MyJob.class);

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(MapClass.class);
    //  job.setCombinerClass(Combiner.class);
        job.setReducerClass(Reduce.class);
    //  job.setInputFormatClass(KeyValueInputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);
    //  job.setOutputFormatClass(KeyValueOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

    public static void main(String args[]) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MyJob(), args);
        System.exit(res);
    }
}


Это ошибка:

11/12/16 22:16:58 INFO mapred.JobClient: Task Id : attempt_201112161948_0005_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1013)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:253)

Ответы [ 3 ]

8 голосов
/ 17 декабря 2011

Пара вещей, которые будут исправлены в коде

  1. Старый (o.a.h.mapred) и новый API (o.a.h.mapreduce) несовместимы, поэтому их не следует смешивать.

import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;
  1. Убедитесь, что вход / выход для картографов / редукторов имеют тип o.a.h.io.Writable. Ключ ввода для Mapper - Object, сделайте его LongWritable.

  2. Похоже, функции Combiner и Reducer одинаковы, поэтому вам не нужно повторять это.

job.setCombinerClass(Reducer.class);

Кроме того, вы можете использовать пример WordCount , разница между вашим требованием и примером WordCount невелика.

5 голосов
/ 09 апреля 2012

Общее примечание: если у нас есть Mapper<K1,V1, K2,V2> и Reducer<K2,V2, K3,V3>, лучше объявить (в Задании) следующее

JobConf conf = new JobConf(MyJob.class);
...
conf.setMapOutputKeyClass(K2.class);
conf.setMapOutputValueClass(V2.class);

Вы можете увидеть другой пример здесь .

0 голосов
/ 30 апреля 2014

Старый API (o.a.h.mapred) и новый API (o.a.h.mapreduce) несовместимы, поэтому их не следует смешивать.

import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

Вам следует попробовать заменить OutputCollector и Reporter в Map и уменьшить функции подписи на Context. map (ключ K1, V1 val, контекст Context) и output.collect (k, v) с context.write (k, v)

Для справки используйте эту ссылку с более подробной информацией о переходе на новый API http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api#

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...