Манипуляции со строкой редуктора Hadoop не работают - PullRequest
4 голосов
/ 01 октября 2010

Привет Манипулирование текстом в фазе уменьшения, кажется, работает неправильно.Я подозреваю, что проблема может быть в моем коде, а не в самом hasoop, но вы никогда не знаете ... Если вы можете заметить какие-либо ошибки, дайте мне знать.Я потратил впустую день, пытаясь выяснить, что не так с этим кодом.

мой пример входного файла с именем simple.psv

12345   abc@bbc.com|m|1975
12346   bbc@cde.com|m|1981

мой код Mapper и редуктора

package simplemapreduce;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;

/**
 *
 * @author
 */
public class Main {


    public static class SimpleMap extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text inputs,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            String inputString = inputs.toString();
            //System.out.println("CRM Map record:"+inputString);
            StringTokenizer tokenizer = new StringTokenizer(inputString);
            Text userid = new Text();
            if (tokenizer.hasMoreTokens()) {
                userid.set(tokenizer.nextToken());
                Text data = new Text();
                if (tokenizer.hasMoreTokens()) {
                    data.set(tokenizer.nextToken());
                } else {
                    data.set("");
                }
                output.collect(userid, data);
            }
        }
    }

    /**
     * A reducer class that just emits its input.
     */
    public static class SimpleReduce extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            while (values.hasNext()) {
                Text txt = values.next();
                String inputString = "<start>" + txt.toString() + "<end>";
                Text out = new Text();
                out.set(inputString);
                //System.out.println(inputString);
                output.collect(key, out);

            }
        }
    }

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {
            System.err.println("Usage: SimpleMapReduce <input path> <output path>");
            System.exit(1);
        }
        JobConf conf = new JobConf(Main.class);
        conf.setJobName("Simple Map reducer");

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(SimpleMap.class);
        conf.setCombinerClass(SimpleReduce.class);
        conf.setReducerClass(SimpleReduce.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setNumReduceTasks(1);
        JobClient.runJob(conf);
    }
}

мой пример сценария запуска с именем simple.sh

#!/bin/bash

hadoop jar SimpleMapReduce.jar \
  /full/path/to/input/simple.tsv  /user/joebloggs/output

ожидаемый вывод

12345   <start>abc@bbc.com|m|1975<end>
12346   <start>bbc@cde.com|m|1981<end>

фактический вывод

12345   <start><start>abc@bbc.com|m|1975<end><end>
12346   <start><start>bbc@cde.com|m|1981<end><end>

Я также протестировал это на Amazon s3 в Linuxесли бы вы могли определить проблему и сообщить мне, что это ... это действительно спасет мои волосы!

Ответы [ 2 ]

4 голосов
/ 02 октября 2010

Основной поток данных через систему:

Input -> Map -> Reduce -> output.

В качестве оптимизации производительности был добавлен объединитель, позволяющий компьютеру (одному из множества в кластере hadoop) выполнять частичное агрегированиеданных перед их передачей в систему, в которой работает фактический редуктор.

В примере с подсчетом слов можно начать с следующих значений:

1 1 1 1 1 1 1 1 1 1

объединить их в

3 4 2 1

и приведение их к конечному результату

10

Таким образом, объединитель по сути является оптимизацией производительности.Если вы не укажете объединитель, он не изменит передаваемую информацию (т. Е. Это «преобразователь идентичности»).Таким образом, вы можете использовать один и тот же класс в качестве объединителя и редуктора только в том случае, если набор данных остается действительным.В вашем случае: это не так -> ваши данные теперь недействительны.

Вы делаете:

conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);

Таким образом, это приводит к тому, что вывод вашего преобразователя проходит через преобразователь дважды.Первый добавляет: «начало» и «конец» Второй добавляет «начало» и «конец» снова.

Простое решение:

// conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);

HTH

2 голосов
/ 08 марта 2011

У меня была проблема, когда редуктор не получал все данные, отправленные картографом.Редуктор получит только до определенной части output.collect будет излучать.Например,для входных данных:

12345 abc@bbc.com|m|1975
12346 bbc@cde.com|m|1981

если я говорю

output.collect (key, mail_id);

Тогда не получат следующие два поля - пол и год рождения.

// conf.setCombinerClass (SimpleReduce.class);

Решена проблема.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...