Mapfile как входные данные для задания MapReduce - PullRequest
3 голосов
/ 21 января 2011

Недавно я начал использовать Hadoop, и у меня возникла проблема при использовании Mapfile в качестве входных данных для задания MapReduce.

В следующем рабочем коде записывается простой MapFile с именем «TestMap» в hdf, где естьтри ключа типа Text и три значения типа BytesWritable.

Вот содержимое TestMap:

$ hadoop fs  -text /user/hadoop/TestMap/data
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor
A    01
B    02
C    03

Вот программа, которая создает MapMile файла TestMap:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;

public class CreateMap {

    public static void main(String[] args) throws IOException{

        Configuration conf = new Configuration();
        FileSystem hdfs  = FileSystem.get(conf);

        Text key = new Text();
        BytesWritable value = new BytesWritable();
        byte[] data = {1, 2, 3};
        String[] strs = {"A", "B", "C"};
        int bytesRead;
        MapFile.Writer writer = null;

        writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass());
        try {
            for (int i = 0; i < 3; i++) {
                key.set(strs[i]);
                value.set(data, i, 1);
                writer.append(key, value);
                System.out.println(strs[i] + ":" + data[i] + " added.");
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
             IOUtils.closeStream(writer);
        }
    }
}

Следующее простое задание MapReduce пытается увеличить на единицу значения файла карты:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;


public class AddOne extends Configured implements Tool {

    public static class MapClass extends MapReduceBase

        implements Mapper<Text, BytesWritable, Text, Text> {

        public void map(Text key, BytesWritable value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {


            byte[] data = value.getBytes();
            data[0] += 1;
            value.set(data, 0, 1);
            output.collect(key, new Text(value.toString()));
        }
    }

    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {

            output.collect(key, values.next());
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        JobConf job = new JobConf(conf, AddOne.class);

        Path in = new Path("TestMap");
        Path out = new Path("output");
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("AddOne");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ":");


        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AddOne(), args);

        System.exit(res);
    }
}

Исключение времени выполнения, которое я получаю:

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
    at AddOne$MapClass.map(AddOne.java:32)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

Я не понимаюпочему hadoop пытается привести LongWritable, поскольку в своем коде я правильно определяю интерфейс Mapper (Mapper<Text, BytesWritable, Text, Text>).

Может ли кто-нибудь мне помочь?

Большое спасибо

Luca

Ответы [ 3 ]

15 голосов
/ 23 января 2011

Ваша проблема связана с тем, что, несмотря на то, что говорит вам имя, MapFile является , а не файлом.

Файл MapFile на самом деле является каталогом, которыйсостоит из двух файлов: есть файл «data», который представляет собой SequenceFile , содержащий ключи и значения, которые вы в него записываете;тем не менее, существует также «индексный» файл, который представляет собой другой файл SequenceFile, содержащий подпоследовательность ключей вместе с их смещениями в виде LongWritables;этот индекс загружается в память с помощью MapFile.Reader, чтобы вы могли быстро выполнить двоичный поиск, чтобы найти смещение в файле данных, в котором будут данные, которые вы хотите получить при произвольном доступе.

Вы используете старый «org.apache.hadoop.mapred» версия SequenceFileInputFormat .Он недостаточно умен, чтобы знать, что смотреть на файл данных можно только тогда, когда вы говорите ему смотреть на MapFile как на вход;вместо этого он фактически пытается использовать файл данных и индексный файл в качестве обычных входных файлов.Файл данных будет работать правильно, потому что классы согласуются с тем, что вы укажете, но файл индекса выдаст исключение ClassCastException, поскольку все значения файла индекса - это LongWritables.1017 * "org.apache.hadoop.mapreduce" версия SequenceFileInputFormat (таким образом изменяя другие части вашего кода), которая знает достаточно о MapFiles, чтобы просто взглянуть на файл данных;или вместо этого вы можете явно указать файл данных в качестве файла, который вы хотите использовать в качестве входных данных.

0 голосов
/ 07 июня 2012

Я решил ту же проблему, используя KeyValueTextInputFormat.class

Упомяну весь подход в

http://sanketraut.blogspot.in/2012/06/hadoop-example-setting-up-hadoop-on.html

0 голосов
/ 04 мая 2011

Одним из подходов может быть использование пользовательского InputFormat с одной записью для всего блока MapFile и поиск по ключу из map ()

...