Доступ к счетчику картографа из редуктора - PullRequest
12 голосов
/ 27 марта 2011

Мне нужен доступ к счетчикам из моего картографа в моем редукторе.Это возможно?Если да, то как это сделать?

Например: мой маппер:

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    @Override
    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.getCounter(TestCounters.TEST).increment(1);
        context.write(key, value);
    }
}

Мой редуктор

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));
    }
}

counterValue всегда равно 0. Я делаючто-то не так или это просто невозможно?

Ответы [ 6 ]

11 голосов
/ 19 июля 2011

В конфигурации редуктора (JobConf) вы можете использовать объект JobConf для поиска собственного идентификатора задания редуктора.С этим ваш редуктор может создать свой собственный JobClient - то есть соединение с JobTracker - и запросить счетчики для этой работы (или любой работы по этому вопросу).

// in the Reducer class...
private long mapperCounter;

@Override
public void configure(JobConf conf) {
    JobClient client = new JobClient(conf);
    RunningJob parentJob = 
        client.getJob(JobID.forName( conf.get("mapred.job.id") ));
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}

Теперь вы можете использовать mapperCounterвнутри самого метода Reduce ().

Здесь вам действительно нужна попытка перехвата.Я использую старый API, но его не должно быть сложно адаптировать к новому API.

Обратите внимание, что все счетчики мапперов должны быть завершены перед запуском любого редуктора, так что вопреки комментарию Джастина Томасасчитаю, что вы должны получить точные значения (если редукторы не увеличивают один и тот же счетчик!)

8 голосов
/ 13 мая 2014

Реализовано решение Джеффа Г на новом API:

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }
2 голосов
/ 27 марта 2011

Смысл карты / сокращения заключается в распараллеливании заданий.Будет много уникальных картографов / редукторов, поэтому значение будет неправильным в любом случае, за исключением запуска карты / редукции.

У них есть пример подсчета слов:

http://wiki.apache.org/hadoop/WordCount

Вы можете изменить context.write (word, one) на context.write (line, one)

1 голос
/ 26 января 2016

Я задал этот вопрос , но я не решил свою проблему.Однако альтернативное решение пришло мне в голову.В маппере подсчитывается количество слов, и его можно записать в промежуточный вывод с минимальным ключом (чтобы это значение было в голове) в функции очистки, которая запускает конец маппера.В редукторе количество слов рассчитывается путем сложения значений в голове.Пример кода и его часть доступны ниже.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by tolga on 1/26/16.
 */
public class WordCount {
    static enum TestCounters { TEST }
    public static class Map extends Mapper<Object, Text, Text, LongWritable> {
        private final static LongWritable one = new LongWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
                context.getCounter(TestCounters.TEST).increment(1);
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue()));
        }
    }

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

        public void reduce(Text key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "WordCount");
        job.setJarByClass(WordCount.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

Текстовый файл:

Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal.

Промежуточный вывод

**!	33**
2008	1
Action	1
Ankara,	1
Foundation	1
It	1
Thought	1
Turgut	1
Turgut	1
Turgut	1

**!	33**
2008	1
Action	1
Ankara,	1
Foundation	1
It	1
Thought	1
Turgut	3
1 голос
/ 27 марта 2011

Значения глобального счетчика никогда не передаются обратно каждому преобразователю или преобразователю. Если вы хотите, чтобы # число картографических записей было доступно редуктору, вам потребуется полагаться на какой-то внешний механизм для этого.

0 голосов
/ 29 августа 2017

Улучшение от ответа Ицхаки

findCounter(COUNTER_NAME) больше не поддерживается - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

@Override
public void setup(Context context) throws IOException, InterruptedException{
    Configuration conf = context.getConfiguration();
    Cluster cluster = new Cluster(conf);
    Job currentJob = cluster.getJob(context.getJobID());
    mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue();  
}

GROUP_NAME указывается при вызове счетчика. например,

context.getCounter("com.example.mycode", "MY_COUNTER").increment(1);

тогда

mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue();  

Кроме того, один важный момент: если счетчик не существует, он инициализирует его значением 0.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...