FileInputFormat, где filename - KEY, а текстовое содержимое - VALUE. - PullRequest
4 голосов
/ 04 мая 2011

Я хотел бы использовать весь файл как отдельную запись для обработки MAP с именем файла в качестве ключа.
Я прочитал следующий пост: Как получить имя файла / содержимое файлакак ввод ключа / значения для MAP при запуске задания Hadoop MapReduce?
, и хотя теория верхнего ответа является твердой, никакого кода или практических рекомендаций фактически не предоставлено.

ЗдесьЭто мой пользовательский FileInputFormat и соответствующие RecordReader, которые компилируются, но не дают ЛЮБЫХ данных записи.
Спасибо за любую помощь.

public class CommentsInput
    extends FileInputFormat<Text,Text> {
protected boolean isSplitable(FileSystem fs, Path filename)
{
    return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx)
        throws IOException, InterruptedException {
    return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration());
}

/////////////////////////

public class CommentFileRecordReader
    extends RecordReader<Text,Text> {
private InputStream in;
private long start;
private long length;
private long position;
private Text key;
private Text value;
private boolean processed;
private FileSplit fileSplit;
private Configuration conf;

public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException
{
    this.fileSplit = fileSplit;
    this.conf=conf;
}

/** Boilerplate initialization code for file input streams. */
@Override
public void initialize(InputSplit split,
                     TaskAttemptContext context)
                        throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();

    fileSplit = (FileSplit) split;
    this.start = fileSplit.getStart();
    this.length = fileSplit.getLength();
    this.position = 0;
    this.processed = false;

    Path path = fileSplit.getPath();
    FileSystem fs = path.getFileSystem(conf);
    FSDataInputStream in = fs.open(path);

    CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
    CompressionCodec codec = codecs.getCodec(path);
    if (codec != null)
        this.in = codec.createInputStream(in);
    else
        this.in = in;

    // If using Writables:
    // key = new Text();
    // value = new Text();
}
public boolean next(Text key, Text value) throws IOException
{
    if(!processed)
    {
        key = new Text(fileSplit.getPath().toString());
        Path file = fileSplit.getPath();
        FileSystem fs = file.getFileSystem(conf);
        FSDataInputStream in = null;
        byte[] contents = new byte[(int) fileSplit.getLength()];
        try
        {
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents.toString());
        }
        finally
        {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }
    return false;
}

@Override
public boolean nextKeyValue() throws IOException {
    // TODO parse the next key value, update position and return true.
    return false;
}

@Override
public Text getCurrentKey() {
    return key;
}

@Override
public Text getCurrentValue() {
    return value;
}

/** Returns our progress within the split, as a float between 0 and 1. */
@Override
public float getProgress() {
    if (length == 0)
        return 0.0f;
    return Math.min(1.0f, position / (float)length);
}

@Override
public void close() throws IOException {
    if (in != null)
        in.close();
}
}  

1 Ответ

1 голос
/ 31 августа 2012

Вам нужно найти способ определить свой собственный ключевой класс и убедиться, что ваши классы его используют.Вы можете посмотреть, как определить свой собственный класс ключей, и получить имя файла, вызвав метод hte getName() для его пути, а затем использовать его для создания своего ключа.

...