Несколько строк текста на одной карте - PullRequest
7 голосов
/ 26 апреля 2010

Я пытался использовать Hadoop для отправки N строк на одно отображение. Я не требую, чтобы линии уже были разделены.

Я пытался использовать NLineInputFormat, однако он отправляет N строк текста из данных в каждое устройство отображения по одной строке за раз [отказываясь после N-й строки].

Я попытался установить этот параметр, и он занимает только N строк ввода, отправляя его по одной строке за раз на каждую карту:

    job.setInt("mapred.line.input.format.linespermap", 10);

Я нашел список рассылки, рекомендующий переопределить LineRecordReader :: next, однако это не так просто, так как все внутренние данные являются частными.

Я только что проверил источник NLineInputFormat, и он жестко кодирует LineReader, поэтому переопределение не поможет.

Кроме того, кстати, я использую Hadoop 0.18 для совместимости с Amazon EC2 MapReduce.

Ответы [ 3 ]

7 голосов
/ 27 апреля 2010

Вы должны реализовать свой собственный формат ввода . У вас также есть возможность определить свой собственный читатель записи.

К сожалению, вы должны определить метод getSplits (). На мой взгляд, это будет сложнее, чем реализация устройства чтения записей: этот метод должен реализовывать логику для разделения входных данных.

См. Следующую выдержку из "Hadoop - Полное руководство" (отличная книга, которую я всегда рекомендую!):

Вот интерфейс:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

JobClient вызывает метод getSplits (), передавая необходимое количество задач карты в качестве аргумента numSplits. Это число рассматривается как подсказка, поскольку InputFormat реализует Ментации могут возвращать различное количество разбиений на число, указанное в numSplits. Вычислив сплиты, клиент отправляет их на трекер, который использует их места хранения для планирования задач карты для обработки их в средствах отслеживания задач.

На трекере задач задача карты передает разбиение методу getRecordReader () в InputFormat для получения RecordReader для этого разделения. RecordReader немного больше, чем итератор над записями, и задача карты использует его для генерации пар ключ-значение записи, который он передает функции карты. Фрагмент кода (на основе кода в MapRunner) иллюстрирует идею:

K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
  mapper.map(key, value, output, reporter);
} 
2 голосов
/ 29 апреля 2015

Недавно я решил эту проблему, просто создав собственный InputFormat, который переопределяет NLineInputFormat и реализует собственный MultiLineRecordReader вместо LineReader по умолчанию.

Я решил расширить NLineInputFormat, потому что хотел иметь ту же гарантию, что в каждом разделении должно быть ровно N строк.

Этот читатель записей взят почти как http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

Единственное, что я изменил, - это свойство для maxLineLength, которое теперь использует новый API, и значение для NLINESTOPROCESS, которое читается из setNumLinesPerSplit() NLineInputFormat, несмотря на жесткое кодирование (для большей гибкости).

Вот результат:

public class MultiLineInputFormat extends NLineInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) {
        context.setStatus(genericSplit.toString());
        return new MultiLineRecordReader();
    }

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{
        private int NLINESTOPROCESS;
        private LineReader in;
        private LongWritable key;
        private Text value = new Text();
        private long start =0;
        private long end =0;
        private long pos =0;
        private int maxLineLength;

        @Override
        public void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (start == end) {
                return 0.0f;
            }
            else {
                return Math.min(1.0f, (pos - start) / (float)(end - start));
            }
        }

        @Override
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
            NLINESTOPROCESS = getNumLinesPerSplit(context);
            FileSplit split = (FileSplit) genericSplit;
            final Path file = split.getPath();
            Configuration conf = context.getConfiguration();
            this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE);
            FileSystem fs = file.getFileSystem(conf);
            start = split.getStart();
            end= start + split.getLength();
            boolean skipFirstLine = false;
            FSDataInputStream filein = fs.open(split.getPath());

            if (start != 0){
                skipFirstLine = true;
                --start;
                filein.seek(start);
            }
            in = new LineReader(filein,conf);
            if(skipFirstLine){
                start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);
            if (value == null) {
                value = new Text();
            }
            value.clear();
            final Text endline = new Text("\n");
            int newSize = 0;
            for(int i=0;i<NLINESTOPROCESS;i++){
                Text v = new Text();
                while (pos < end) {
                    newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                    value.append(v.getBytes(),0, v.getLength());
                    value.append(endline.getBytes(),0, endline.getLength());
                    if (newSize == 0) {
                        break;
                    }
                    pos += newSize;
                    if (newSize < maxLineLength) {
                        break;
                    }
                }
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    }

}
1 голос
/ 23 марта 2011

Я думаю, что в вашем случае вы можете следовать шаблону делегирования и реализовать обертку вокруг LineRecordReader, которая переопределяет необходимые методы, т.е. next () (или nextKeyValue () в новом API), чтобы установить значение для конкатенации из N строк, а не одна линия.

Я погуглил примерную реализацию ParagraphRecordReader, которая использует LineRecordReader для считывания входных данных построчно (и конкатенации) до появления либо EOF, либо пустой строки. Затем он возвращает пару, где значением является абзац (вместо одной строки). Более того, ParagraphInputFormat для этого ParagraphRecordReader так же прост, как и стандартный TextInputFormat.

Вы можете найти необходимые ссылки на эту реализацию и пару слов об этом в следующем посте: http://hadoop -mapreduce.blogspot.com / 2011/03 / little-более -ложн-recordreaders.html .

Лучшее

...